[
https://issues.apache.org/jira/browse/BEAM-12824?focusedWorklogId=667573&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-667573
]
ASF GitHub Bot logged work on BEAM-12824:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Oct/21 09:13
Start Date: 20/Oct/21 09:13
Worklog Time Spent: 10m
Work Description: mbenenso commented on a change in pull request #15434:
URL: https://github.com/apache/beam/pull/15434#discussion_r732571408
##########
File path:
sdks/java/io/deltalake/src/main/java/org/apache/beam/sdk/io/DeltaFileIO.java
##########
@@ -0,0 +1,653 @@
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Transforms for accessing Delta Lake files: listing files (matching) and
reading.
+ *
+ * <h2>Getting snapshot</h2>
+ *
+ * <p>{@link #snapshot} and {@link #matchAll} match filepatterns (respectively
either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that
match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration
options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of
filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns
(watching for new files).
+ *
+ * <h3>Example: Watching a Delta Lake for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds,
continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops
if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ * .filepattern("...")
+ * .continuously(
+ * Duration.standardSeconds(30),
afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a Delta Lake directory for a specific version </h3>
+ *
+ * <p>This example reads Delta Lake parquet files for a version 5.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(DeltaFileIO.match()
+ * .filepattern("...")
+ * .withVersion(5L)
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #snapshot} or {@link
#matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally
decompressing it.
+ *
+ * <h3>Example: Returning filenames of parquet files from the latest snapshot
in Delta Lake</h3>
+ * *
+ * <pre>{@code
+ pipeline
+ .apply("Get Snapshot",
+ DeltaFileIO.snapshot()
+ .filepattern(filePattern)
+ .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
+ )
+ .apply("Read matched files",
+ DeltaFileIO.readMatches()
+ )
+ .apply("Read parquet files",
+ ParquetIO.readFiles(<SCHEMA>)
+ )
+ * }</pre>
+ *
+ */
+
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class DeltaFileIO
+{
+ private static final Logger LOG = LoggerFactory.getLogger(DeltaFileIO.class);
+
+ /**
+ * Process a filepattern using DeltaLake Standalone and produces a
collection of parquet files
+ * as {@link MatchResult.Metadata}.
+ *
+ * <p>By default, matches the filepattern once and produces a bounded {@link
PCollection}. To
+ * continuously watch the filepattern for new matches, use {@link
MatchAll#continuously(Duration,
+ * TerminationCondition)} - this will produce an unbounded {@link
PCollection}.
+ *
+ * <p>By default, a filepattern matching no resources is treated according
to {@link
+ * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+ * Match#withEmptyMatchTreatment}.
+ *
+ * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename.
For example, if this
+ * transform observes a file with the same name several times with different
metadata (e.g.
+ * because the file is growing), it will emit the metadata the first time
this file is observed,
+ * and will ignore future changes to this file.
+ */
+ public static Match snapshot() {
+ return new AutoValue_DeltaFileIO_Match.Builder()
+
.setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+ .build();
+ }
+
+ /**
+ * Like {@link #snapshot}, but matches each filepattern in a collection of
filepatterns.
+ *
+ * <p>Resources are not deduplicated between filepatterns, i.e. if the same
resource matches
+ * multiple filepatterns, it will be produced multiple times.
+ *
+ * <p>By default, a filepattern matching no resources is treated according
to {@link
+ * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use
{@link
+ * MatchAll#withEmptyMatchTreatment}.
+ */
+ public static MatchAll matchAll() {
+ return new AutoValue_DeltaFileIO_MatchAll.Builder()
+
.setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+ .build();
+ }
+
+ /**
+ * Converts each result of {@link #snapshot} or {@link #matchAll} to a
{@link ReadableFile} which can
+ * be used to read the contents of each file, optionally decompressing it.
+ */
+ public static ReadMatches readMatches() {
Review comment:
I try to make usage of DeltaFileIO similar to FileIO, so typically
application will do:
pipeline
.apply("Get Snapshot",
DeltaFileIO.snapshot()
.filepattern(filePattern)
.withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
)
.apply("Read matched files",
DeltaFileIO.readMatches()
)
.apply("Read parquet files",
ParquetIO.readFiles(<SCHEMA>)
)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 667573)
Time Spent: 5h (was: 4h 50m)
> Beam Connector for Reading Data from Delta Lake
> -----------------------------------------------
>
> Key: BEAM-12824
> URL: https://issues.apache.org/jira/browse/BEAM-12824
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-core
> Affects Versions: 2.29.0
> Reporter: Michael
> Assignee: Michael
> Priority: P2
> Labels: features
> Time Spent: 5h
> Remaining Estimate: 0h
>
> Beam Connector for Reading Data from Delta Lake
> Initial version is in [https://github.com/mbenenso/beam-deltalake] for the
> connector & [https://github.com/mbenenso/beam-deltalake-example] for the
> usage example.
>
> Consider to add it to the Beam repository
--
This message was sent by Atlassian Jira
(v8.3.4#803005)