abhiy13 commented on a change in pull request #12645: URL: https://github.com/apache/beam/pull/12645#discussion_r474444099
########## File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java ########## @@ -0,0 +1,594 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.contextualtextio; + +import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.CompressedSource; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileBasedSource; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileIO.MatchConfiguration; +import org.apache.beam.sdk.io.ReadAllViaFileBasedSource; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +/** + * {@link PTransform}s that read text files and collect contextual information of the elements in + * the input. + * + * <h2>Reading from text files</h2> + * + * <p>To read a {@link PCollection} from one or more text files, use {@code + * ContextualTextIO.read()}. To instantiate a transform use {@link + * ContextualTextIO.Read#from(String)} and specify the path of the file(s) to be read. + * Alternatively, if the filenames to be read are themselves in a {@link PCollection} you can use + * {@link FileIO} to match them and {@link ContextualTextIO#readFiles()} to read them. + * + * <p>{@link #read} returns a {@link PCollection} of {@link RecordWithMetadata RecordWithMetadata}, + * each corresponding to one line of an inout UTF-8 text file (split into lines delimited by '\n', + * '\r', '\r\n', or specified delimiter see {@link ContextualTextIO.Read#withDelimiter}) + * + * <h3>Filepattern expansion and watching</h3> + * + * <p>By default, the filepatterns are expanded only once. The combination of {@link + * FileIO.Match#continuously(Duration, TerminationCondition)} and {@link #readFiles()} allow + * streaming of new files matching the filepattern(s). + * + * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link #readFiles()} + * allows them in case the filepattern contains a glob wildcard character. Use {@link + * ContextualTextIO.Read#withEmptyMatchTreatment} or {@link + * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link #readFiles()} to configure + * this behavior. + * + * <p>Example 1: reading a file or filepattern. + * + * <pre>{@code + * Pipeline p = ...; + * + * // A simple Read of a file: + * PCollection<RecordWithMetadata> lines = p.apply(ContextualTextIO.read().from("/local/path/to/file.txt")); + * }</pre> + * + * <p>Example 2: reading a PCollection of filenames. + * + * <pre>{@code + * Pipeline p = ...; + * + * // E.g. the filenames might be computed from other data in the pipeline, or + * // read from a data source. + * PCollection<String> filenames = ...; + * + * // Read all files in the collection. + * PCollection<RecordWithMetadata> lines = + * filenames + * .apply(FileIO.matchAll()) + * .apply(FileIO.readMatches()) + * .apply(ContextualTextIO.readFiles()); + * }</pre> + * + * <p>Example 3: streaming new files matching a filepattern. + * + * <pre>{@code + * Pipeline p = ...; + * + * PCollection<RecordWithMetadata> lines = p.apply(ContextualTextIO.read() + * .from("/local/path/to/files/*") + * .watchForNewFiles( + * // Check for new files every minute + * Duration.standardMinutes(1), + * // Stop watching the filepattern if no new files appear within an hour + * afterTimeSinceNewOutput(Duration.standardHours(1)))); + * }</pre> + * + * <p>Example 4: reading a file or file pattern of RFC4180-compliant CSV files with fields that may + * contain line breaks. + * + * <pre>{@code + * Pipeline p = ...; + * + * PCollection<RecordWithMetadata> lines = p.apply(ContextualTextIO.read() + * .from("/local/path/to/files/*.csv") + * .withHasMultilineCSVRecords(true)); + * }</pre> + * + * <p>Example 5: reading while watching for new files + * + * <pre>{@code + * Pipeline p = ...; + * + * PCollection<RecordWithMetadata> lines = p.apply(FileIO.match() + * .filepattern("filepattern") + * .continuously( + * Duration.millis(100), + * Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3)))) + * .apply(FileIO.readMatches()) + * .apply(ContextualTextIO.readFiles()); + * }</pre> + * + * NOTE: Using {@link ContextualTextIO.Read#withHasMultilineCSVRecords(Boolean)} introduces a + * performance penalty: when this option is enabled, the input cannot be split and read in parallel. + * + * <h3>Reading a very large number of files</h3> + * + * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of + * thousands or more), use {@link ContextualTextIO.Read#withHintMatchesManyFiles} for better + * performance and scalability. Note that it may decrease performance if the filepattern matches + * only a small number of files. + */ +public class ContextualTextIO { + private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L; + + /** + * A {@link PTransform} that reads from one or more text files and returns a bounded {@link Review comment: Added the example. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
