[
https://issues.apache.org/jira/browse/BEAM-10124?focusedWorklogId=470270&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-470270
]
ASF GitHub Bot logged work on BEAM-10124:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Aug/20 14:59
Start Date: 13/Aug/20 14:59
Worklog Time Spent: 10m
Work Description: tvalentyn commented on a change in pull request #12490:
URL: https://github.com/apache/beam/pull/12490#discussion_r470015854
##########
File path:
sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/ContextualTextIO/ContextualTextIO.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.ContextualTextIO;
+
+import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.CompressedSource;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileIO.MatchConfiguration;
+import org.apache.beam.sdk.io.ReadAllViaFileBasedSource;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.Duration;
+
+/**
+ * {@link PTransform}s that read text files and collect contextual information
of the elements in
+ * the input.
+ *
+ * <h2>Reading from text files</h2>
+ *
+ * <p>To read a {@link PCollection} from one or more text files, use {@code
+ * ContextualTextIO.read()}. To instantiate a transform use {@link
+ * ContextualTextIO.Read#from(String)} and specify the path of the file(s) to
be read.
+ * Alternatively, if the filenames to be read are themselves in a {@link
PCollection} you can use
+ * {@link FileIO} to match them and {@link ContextualTextIO#readFiles()} to
read them.
+ *
+ * <p>{@link #read} returns a {@link PCollection} of {@link LineContext
LineContext}, each
+ * corresponding to one line of an inout UTF-8 text file (split into lines
delimited by '\n', '\r',
+ * '\r\n', or specified delimiter see {@link
ContextualTextIO.Read#withDelimiter})
+ *
+ * <h3>Filepattern expansion and watching</h3>
+ *
+ * <p>By default, the filepatterns are expanded only once. The combination of
{@link
+ * FileIO.Match#continuously(Duration, TerminationCondition)} and {@link
#readFiles()} allow
+ * streaming of new files matching the filepattern(s).
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files,
and {@link #readFiles()}
+ * allows them in case the filepattern contains a glob wildcard character. Use
{@link
+ * ContextualTextIO.Read#withEmptyMatchTreatment} or {@link
+ * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link
#readFiles()} to configure
+ * this behavior.
+ *
+ * <p>Example 1: reading a file or filepattern.
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * // A simple Read of a local file (only runs locally when the filepath is on
system):
+ * PCollection<LineContext> lines =
p.apply(ContextualTextIO.read().from("/local/path/to/file.txt"));
+ * }</pre>
+ *
+ * <p>Example 2: reading a PCollection of filenames.
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * // E.g. the filenames might be computed from other data in the pipeline, or
+ * // read from a data source.
+ * PCollection<String> filenames = ...;
+ *
+ * // Read all files in the collection.
+ * PCollection<LineContext> lines =
+ * filenames
+ * .apply(FileIO.matchAll())
+ * .apply(FileIO.readMatches())
+ * .apply(ContextualTextIO.readFiles());
+ * }</pre>
+ *
+ * <p>Example 3: streaming new files matching a filepattern.
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<LineContext> lines = p.apply(ContextualTextIO.read()
+ * .from("/local/path/to/files/*")
+ * .watchForNewFiles(
+ * // Check for new files every minute
+ * Duration.standardMinutes(1),
+ * // Stop watching the filepattern if no new files appear within an hour
+ * afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <p>Example 4: reading a file or file pattern of RFC4180-compliant CSV files
with fields that may
+ * contain line breaks.
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<LineContext> lines = p.apply(ContextualTextIO.read()
+ * .from("/local/path/to/files/*.csv")
+ * .withHasRFC4180MultiLineColumn(true));
+ * }</pre>
+ *
+ * <p>Example 5: reading while watching for new files
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<LineContext> lines = p.apply(FileIO.match()
+ * .filepattern("filepattern")
+ * .continuously(
+ * Duration.millis(100),
+ * Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(3))))
+ * .apply(FileIO.readMatches())
+ * .apply(ContextualTextIO.readFiles());
+ * }</pre>
+ *
+ * NOTE: Using {@link
ContextualTextIO.Read#withHasRFC4180MultiLineColumn(boolean)} introduces a
+ * performance penalty: when this option is enabled, the input cannot be split
and read in parallel.
+ *
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of
files (e.g. tens of
+ * thousands or more), use {@link
ContextualTextIO.Read#withHintMatchesManyFiles} for better
+ * performance and scalability. Note that it may decrease performance if the
filepattern matches
+ * only a small number of files.
+ */
+public class ContextualTextIO {
+ private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
+
+ /**
+ * A {@link PTransform} that reads from one or more text files and returns a
bounded {@link
+ * PCollection} containing one {@link LineContext}element for each line of
the input files.
+ */
+ public static Read read() {
+ return new AutoValue_ContextualTextIO_Read.Builder()
+ .setCompression(Compression.AUTO)
+ .setHintMatchesManyFiles(false)
+
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+ .setHasRFC4180MultiLineColumn(false)
+ .build();
+ }
+
+ /**
+ * Like {@link #read}, but reads each file in a {@link PCollection} of {@link
+ * FileIO.ReadableFile}, returned by {@link FileIO#readMatches}.
+ */
+ public static ReadFiles readFiles() {
+ return new AutoValue_ContextualTextIO_ReadFiles.Builder()
+ // 64MB is a reasonable value that allows to amortize the cost of
opening files,
+ // but is not so large as to exhaust a typical runner's maximum amount
of output per
+ // ProcessElement call.
+ .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+ .setHasRFC4180MultiLineColumn(false)
+ .build();
+ }
+
+ /** Implementation of {@link #read}. */
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin,
PCollection<LineContext>> {
+ abstract @Nullable ValueProvider<String> getFilepattern();
+
+ abstract MatchConfiguration getMatchConfiguration();
+
+ abstract boolean getHintMatchesManyFiles();
+
+ abstract Compression getCompression();
+
+ abstract @Nullable Boolean getHasRFC4180MultiLineColumn();
+
+ @SuppressWarnings("mutable") // this returns an array that can be mutated
by the caller
+ abstract @Nullable byte[] getDelimiter();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setFilepattern(ValueProvider<String> filepattern);
+
+ abstract Builder setMatchConfiguration(MatchConfiguration
matchConfiguration);
+
+ abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
+
+ abstract Builder setCompression(Compression compression);
+
+ abstract Builder setDelimiter(byte[] delimiter);
+
+ abstract Builder setHasRFC4180MultiLineColumn(Boolean
hasRFC4180MultiLineColumn);
+
+ abstract Read build();
+ }
+
+ /**
+ * Reads text from the file(s) with the given filename or filename pattern.
+ *
+ * <p>This can be a local path (if running locally), or a Google Cloud
Storage filename or
+ * filename pattern of the form {@code "gs://<bucket>/<filepath>"} (if
running locally or using
+ * remote execution service).
+ *
+ * <p>Standard <a
href="http://docs.oracle.com/javase/tutorial/essential/io/find.html" >Java
+ * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
+ *
+ * <p>If it is known that the filepattern will match a very large number
of files (at least tens
+ * of thousands), use {@link #withHintMatchesManyFiles} for better
performance and scalability.
+ */
+ public Read from(String filepattern) {
+ checkArgument(filepattern != null, "filepattern can not be null");
+ return from(StaticValueProvider.of(filepattern));
+ }
+
+ /** Same as {@code from(filepattern)}, but accepting a {@link
ValueProvider}. */
+ public Read from(ValueProvider<String> filepattern) {
+ checkArgument(filepattern != null, "filepattern can not be null");
+ return toBuilder().setFilepattern(filepattern).build();
+ }
+
+ /** Sets the {@link MatchConfiguration}. */
+ public Read withMatchConfiguration(MatchConfiguration matchConfiguration) {
+ return toBuilder().setMatchConfiguration(matchConfiguration).build();
+ }
+
+ /**
+ * When reading RFC4180 CSV files that have values that span multiple
lines, set this to true.
+ * Note: this reduces the read performance (see: {@link ContextualTextIO}).
+ */
+ public Read withRFC4180MultiLineColumn(Boolean hasRFC4180MultiLineColumn) {
Review comment:
@abhiy13 @rezarokni what do you think about replacing
`withHasRFC4180MultiLineColumn` with
`withMultilineCSV`
and add something like:
```withMultilineCSV allows readling CSV files with multiline fields, however
it disables read parallelism. This setting requires input values with line
breaks be encompassed in double quotes, and double quotes in values be escaped
with a preceding double quote as defined in RFC4180.```
##########
File path:
sdks/java/io/contextual-text-io/src/test/java/org/apache/beam/sdk/io/ContextualTextIO/ContextualTextIOTest.java
##########
@@ -0,0 +1,1271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.ContextualTextIO;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
+import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
+import static org.apache.beam.sdk.io.Compression.AUTO;
+import static org.apache.beam.sdk.io.Compression.BZIP2;
+import static org.apache.beam.sdk.io.Compression.DEFLATE;
+import static org.apache.beam.sdk.io.Compression.GZIP;
+import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED;
+import static org.apache.beam.sdk.io.Compression.ZIP;
+import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assume.assumeFalse;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Writer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ToString;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import
org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+
+/** Tests for {@link ContextualTextIO.Read} */
+public class ContextualTextIOTest {
+ private static final int NUM_LINES_FOR_LARGE = 1024;
+
+ private static final List<String> EMPTY = Collections.emptyList();
+
+ private static final List<String> TINY = Arrays.asList("ABC", "DEF", "HIJ");
+
+ private static final List<String> LARGE = makeLines(NUM_LINES_FOR_LARGE);
+
+ private static File writeToFile(
+ List<String> lines, TemporaryFolder folder, String fileName, Compression
compression)
+ throws IOException {
+ File file = folder.getRoot().toPath().resolve(fileName).toFile();
+ OutputStream output = new FileOutputStream(file);
+ switch (compression) {
+ case UNCOMPRESSED:
+ break;
+ case GZIP:
+ output = new GZIPOutputStream(output);
+ break;
+ case BZIP2:
+ output = new BZip2CompressorOutputStream(output);
+ break;
+ case ZIP:
+ ZipOutputStream zipOutput = new ZipOutputStream(output);
+ zipOutput.putNextEntry(new ZipEntry("entry"));
+ output = zipOutput;
+ break;
+ case DEFLATE:
+ output = new DeflateCompressorOutputStream(output);
+ break;
+ default:
+ throw new UnsupportedOperationException(compression.toString());
+ }
+ writeToStreamAndClose(lines, output);
+ return file;
+ }
+
+ /**
+ * Helper that writes the given lines (adding a newline in between) to a
stream, then closes the
+ * stream.
+ */
+ private static void writeToStreamAndClose(List<String> lines, OutputStream
outputStream) {
+ try (PrintStream writer = new PrintStream(outputStream)) {
+ for (String line : lines) {
+ writer.println(line);
+ }
+ }
+ }
+
+ /** Helper to make an array of compressible strings. Returns ["line" i] for
i in range(0,n). */
+ private static List<String> makeLines(int n) {
+ List<String> lines = new ArrayList<>();
+ for (int i = 0; i < n; ++i) {
+ lines.add("Line " + i);
+ }
+ return lines;
+ }
+
+ private static class convertLineContextToString extends DoFn<LineContext,
String> {
+ @ProcessElement
+ public void processElement(@Element LineContext L, OutputReceiver<String>
out) {
+ String file = L.getFile().substring(L.getFile().lastIndexOf('/') + 1);
+ out.output(file + " " + L.getLineNum() + " " + L.getLine());
+ }
+ }
+
+ /**
+ * Helper method that runs a variety of ways to read a single file using
ContextualTextIO and
+ * checks that they all match the given expected output.
+ *
+ * <p>The transforms being verified are:
+ *
+ * <ul>
+ *
<li>ContextualTextIO.read().from(filename).withCompression(compressionType).withHintMatchesManyFiles()
+ *
<li>ContextualTextIO.read().from(filename).withCompression(compressionType)
+ *
<li>ContextualTextIO.read().from(filename).withCompression(compressionType).with
+ * <li>ContextualTextIO.readFiles().withCompression(compressionType)
+ * </ul>
+ */
+ private static void assertReadingCompressedFileMatchesExpected(
+ File file, Compression compression, List<String> expected, Pipeline p) {
+
+ ContextualTextIO.Read read =
+
ContextualTextIO.read().from(file.getPath()).withCompression(compression);
+
+ // Convert the expected output into LineContext output Format
+ List<String> expectedOutput = new ArrayList<>();
+ for (int lineNum = 0; lineNum < expected.size(); ++lineNum) {
+ expectedOutput.add(file.getName() + " " + lineNum + " " +
expected.get(lineNum));
+ }
+
+ PAssert.that(
+ p.apply("Read_" + file + "_" + compression.toString(), read)
+ .apply("ConvertLineContextToString", ParDo.of(new
convertLineContextToString())))
+ .containsInAnyOrder(expectedOutput);
+ PAssert.that(
+ p.apply(
+ "Read_" + file + "_" + compression.toString() + "_many",
+ read.withHintMatchesManyFiles())
+ .apply(
+ "ConvertLineContextToString" + "_many",
+ ParDo.of(new convertLineContextToString())))
+ .containsInAnyOrder(expectedOutput);
+
+ PAssert.that(
+ p.apply(
+ "Read_" + file + "_" + compression.toString() +
"_withRFC4180",
+ read.withHasRFC4180MultiLineColumn(true))
+ .apply(
+ "ConvertLineContextToString" + "_withRFC4180",
+ ParDo.of(new convertLineContextToString())))
+ .containsInAnyOrder(expectedOutput);
+
+ PAssert.that(
+ p.apply("Create_Paths_ReadFiles_" + file,
Create.of(file.getPath()))
+ .apply("Match_" + file, FileIO.matchAll())
+ .apply("ReadMatches_" + file,
FileIO.readMatches().withCompression(compression))
+ .apply("ReadFiles_" + compression.toString(),
ContextualTextIO.readFiles())
+ .apply(
+ "ConvertLineContextToStringWithFileIO",
+ ParDo.of(new convertLineContextToString())))
+ .containsInAnyOrder(expectedOutput);
+ }
+
+ /**
+ * Create a zip file with the given lines.
+ *
+ * @param expected A list of expected lines, populated in the zip file.
+ * @param folder A temporary folder used to create files.
+ * @param filename Optionally zip file name (can be null).
+ * @param fieldsEntries Fields to write in zip entries.
+ * @return The zip filename.
+ * @throws Exception In case of a failure during zip file creation.
+ */
+ private static File createZipFile(
+ List<String> expected, TemporaryFolder folder, String filename,
String[]... fieldsEntries)
+ throws Exception {
+ File tmpFile = folder.getRoot().toPath().resolve(filename).toFile();
+
+ ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));
+ PrintStream writer = new PrintStream(out, true /* auto-flush on write */);
+
+ int index = 0;
+ for (String[] entry : fieldsEntries) {
+ out.putNextEntry(new ZipEntry(Integer.toString(index)));
+ for (String field : entry) {
+ writer.println(field);
+ expected.add(field);
+ }
+ out.closeEntry();
+ index++;
+ }
+
+ writer.close();
+ out.close();
+
+ return tmpFile;
+ }
+
+ private static ContextualTextIOSource prepareSource(
+ TemporaryFolder temporaryFolder, byte[] data, byte[] delimiter, boolean
hasRFC4180Multiline)
+ throws IOException {
+ Path path = temporaryFolder.newFile().toPath();
+ Files.write(path, data);
+ return new ContextualTextIOSource(
+ ValueProvider.StaticValueProvider.of(path.toString()),
+ EmptyMatchTreatment.DISALLOW,
+ delimiter,
+ hasRFC4180Multiline);
+ }
+
+ private static String getFileSuffix(Compression compression) {
+ switch (compression) {
+ case UNCOMPRESSED:
+ return ".txt";
+ case GZIP:
+ return ".gz";
+ case BZIP2:
+ return ".bz2";
+ case ZIP:
+ return ".zip";
+ case DEFLATE:
+ return ".deflate";
+ default:
+ return "";
+ }
+ }
+ /** Tests for reading from different size of files with various Compression.
*/
+ @RunWith(Parameterized.class)
+ public static class CompressedReadTest {
+ @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+ @Rule public TestPipeline p = TestPipeline.create();
+
+ @Parameterized.Parameters(name = "{index}: {1}")
+ public static Iterable<Object[]> data() {
+ return ImmutableList.<Object[]>builder()
+ .add(new Object[] {EMPTY, UNCOMPRESSED})
+ .add(new Object[] {EMPTY, GZIP})
+ .add(new Object[] {EMPTY, BZIP2})
+ .add(new Object[] {EMPTY, ZIP})
+ .add(new Object[] {EMPTY, DEFLATE})
+ .add(new Object[] {TINY, UNCOMPRESSED})
+ .add(new Object[] {TINY, GZIP})
+ .add(new Object[] {TINY, BZIP2})
+ .add(new Object[] {TINY, ZIP})
+ .add(new Object[] {TINY, DEFLATE})
+ .add(new Object[] {LARGE, UNCOMPRESSED})
+ .add(new Object[] {LARGE, GZIP})
+ .add(new Object[] {LARGE, BZIP2})
+ .add(new Object[] {LARGE, ZIP})
+ .add(new Object[] {LARGE, DEFLATE})
+ .build();
+ }
+
+ @Parameterized.Parameter(0)
+ public List<String> lines;
+
+ @Parameterized.Parameter(1)
+ public Compression compression;
+
+ /** Tests reading from a small, compressed file with no extension. */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testCompressedReadWithoutExtension() throws Exception {
+ String fileName = lines.size() + "_" + compression + "_no_extension";
+ File fileWithNoExtension = writeToFile(lines, tempFolder, fileName,
compression);
+ assertReadingCompressedFileMatchesExpected(fileWithNoExtension,
compression, lines, p);
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testCompressedReadWithExtension() throws Exception {
+ String fileName =
+ lines.size() + "_" + compression + "_no_extension" +
getFileSuffix(compression);
+ File fileWithExtension = writeToFile(lines, tempFolder, fileName,
compression);
+
+ // Sanity check that we're properly testing compression.
+ if (lines.size() == NUM_LINES_FOR_LARGE &&
!compression.equals(UNCOMPRESSED)) {
+ File uncompressedFile = writeToFile(lines, tempFolder, "large.txt",
UNCOMPRESSED);
+ assertThat(uncompressedFile.length(),
greaterThan(fileWithExtension.length()));
+ }
+
+ assertReadingCompressedFileMatchesExpected(fileWithExtension,
compression, lines, p);
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testReadWithAuto() throws Exception {
+ // Files with non-compressed extensions should work in AUTO and
UNCOMPRESSED modes.
+ String fileName =
+ lines.size() + "_" + compression + "_no_extension" +
getFileSuffix(compression);
+ File fileWithExtension = writeToFile(lines, tempFolder, fileName,
compression);
+ assertReadingCompressedFileMatchesExpected(fileWithExtension, AUTO,
lines, p);
+ p.run();
+ }
+ }
+
+ /** Tests for reading files with various delimiters. */
+ @RunWith(Parameterized.class)
+ public static class ReadWithDelimiterTest {
+ private static final ImmutableList<String> EXPECTED =
ImmutableList.of("asdf", "hjkl", "xyz");
+ @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static Iterable<Object[]> data() {
+ return ImmutableList.<Object[]>builder()
+ // .add(new Object[] {"\n\n\n", ImmutableList.of("", "",
"")})
+ .add(new Object[] {"asdf\nhjkl\nxyz\n", EXPECTED})
+ .add(new Object[] {"asdf\rhjkl\rxyz\r", EXPECTED})
+ .add(new Object[] {"asdf\r\nhjkl\r\nxyz\r\n", EXPECTED})
+ .add(new Object[] {"asdf\rhjkl\r\nxyz\n", EXPECTED})
+ .add(new Object[] {"asdf\nhjkl\nxyz", EXPECTED})
+ .add(new Object[] {"asdf\rhjkl\rxyz", EXPECTED})
+ .add(new Object[] {"asdf\r\nhjkl\r\nxyz", EXPECTED})
+ .add(new Object[] {"asdf\rhjkl\r\nxyz", EXPECTED})
+ .build();
+ }
+
+ @Parameterized.Parameter(0)
+ public String line;
+
+ @Parameterized.Parameter(1)
+ public ImmutableList<String> expected;
+
+ @Test
+ public void testReadLinesWithDelimiter() throws Exception {
+ runTestReadWithData(line.getBytes(UTF_8), expected);
+ }
+
+ private ContextualTextIOSource prepareSource(byte[] data, boolean
hasRFC4180Multiline)
+ throws IOException {
+ return ContextualTextIOTest.prepareSource(tempFolder, data, null,
hasRFC4180Multiline);
+ }
+
+ private void runTestReadWithData(byte[] data, List<String>
expectedResults) throws Exception {
+ ContextualTextIOSource source = prepareSource(data, false);
+ List<LineContext> actual =
+ SourceTestUtils.readFromSource(source,
PipelineOptionsFactory.create());
+ List<String> actualOutput = new ArrayList<>();
+ actual.forEach(
+ (LineContext L) -> {
+ String file = L.getFile().substring(L.getFile().lastIndexOf('/') +
1);
+ actualOutput.add(L.getLine());
+ });
+ assertThat(
+ actualOutput,
+ containsInAnyOrder(new ArrayList<>(expectedResults).toArray(new
String[0])));
+ }
+ }
+
+ @RunWith(Parameterized.class)
+ public static class ReadWithDelimiterAndRFC4180 {
+ static final ImmutableList<String> Expected =
ImmutableList.of("\"asdf\nhjkl\nmnop\"", "xyz");
+ @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static Iterable<Object[]> data() {
+ return ImmutableList.<Object[]>builder()
+ .add(new Object[] {"\n\n\n", ImmutableList.of("", "", "")})
+ .add(new Object[] {"\"asdf\nhjkl\"\nxyz\n",
ImmutableList.of("\"asdf\nhjkl\"", "xyz")})
+ .add(new Object[] {"\"asdf\nhjkl\nmnop\"\nxyz\n", Expected})
+ .add(new Object[] {"\"asdf\nhjkl\nmnop\"\nxyz\r", Expected})
+ .add(new Object[] {"\"asdf\nhjkl\nmnop\"\r\nxyz\n", Expected})
+ .add(new Object[] {"\"asdf\nhjkl\nmnop\"\r\nxyz\r\n", Expected})
+ .add(new Object[] {"\"asdf\nhjkl\nmnop\"\rxyz\r\n", Expected})
+ .build();
+ }
+
+ @Parameterized.Parameter(0)
+ public String line;
+
+ @Parameterized.Parameter(1)
+ public ImmutableList<String> expected;
+
+ @Test
+ public void testReadLinesWithDelimiter() throws Exception {
+ runTestReadWithData(line.getBytes(UTF_8), expected);
+ }
+
+ private ContextualTextIOSource prepareSource(byte[] data, boolean
hasRFC4180Multiline)
+ throws IOException {
+ return ContextualTextIOTest.prepareSource(tempFolder, data, null,
hasRFC4180Multiline);
+ }
+
+ private void runTestReadWithData(byte[] data, List<String>
expectedResults) throws Exception {
+ ContextualTextIOSource source = prepareSource(data, true);
+ List<LineContext> actual =
+ SourceTestUtils.readFromSource(source,
PipelineOptionsFactory.create());
+ List<String> actualOutput = new ArrayList<>();
+ actual.forEach(
+ (LineContext L) -> {
+ String file = L.getFile().substring(L.getFile().lastIndexOf('/') +
1);
+ actualOutput.add(L.getLine());
+ });
+ assertThat(
+ actualOutput,
+ containsInAnyOrder(new ArrayList<>(expectedResults).toArray(new
String[0])));
+ }
+ }
+
+ /** Tests Specific for checking functionality of ContextualTextIO */
+ @RunWith(JUnit4.class)
+ public static class ContextualTextIOSpecificTests {
+ @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+ @Rule public TestPipeline p = TestPipeline.create();
+
+ public static final char CR = (char) 0x0D;
+ public static final char LF = (char) 0x0A;
+
+ public static final String CRLF = "" + CR + LF;
+
+ public String createFiles(List<String> input) throws Exception {
Review comment:
nit: createFile
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 470270)
Remaining Estimate: 1,339h 50m (was: 1,340h)
Time Spent: 4h 10m (was: 4h)
> ContextualTextIO - An IO that is provides metadata about the line.
> -------------------------------------------------------------------
>
> Key: BEAM-10124
> URL: https://issues.apache.org/jira/browse/BEAM-10124
> Project: Beam
> Issue Type: New Feature
> Components: io-ideas
> Reporter: Reza ardeshir rokni
> Priority: P2
> Original Estimate: 1,344h
> Time Spent: 4h 10m
> Remaining Estimate: 1,339h 50m
>
> There are two important Source IO's that allow for dealing with text files
> FileIO and TextIO. When the requirements go beyond the scope of TextIO we ask
> the end user to make use of FileIO and go it on their own.
> We want to correct this experience by creating a more feature rich TextIO
> which can return along with each line of data contextual information about
> the line. For example the file that it came from, and the ordinal position of
> the line within the file.
> Another area that we would like to deal with is more formal support for CSV
> files by allowing compliance to RFC4180 files. Specifically the RFC allows
> for line breaks (CRLF) to be used within fields within double quotes.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)