abhiy13 commented on a change in pull request #12645:
URL: https://github.com/apache/beam/pull/12645#discussion_r474446145
##########
File path:
sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
}
@Override
- public PCollection<String> expand(PBegin input) {
- checkNotNull(getFilepattern(), "need to set the filepattern of a
TextIO.Read transform");
+ public PCollection<RecordWithMetadata> expand(PBegin input) {
+ checkNotNull(
+ getFilepattern(), "need to set the filepattern of a
ContextualTextIO.Read transform");
+ PCollection<RecordWithMetadata> lines = null;
if (getMatchConfiguration().getWatchInterval() == null &&
!getHintMatchesManyFiles()) {
- return input.apply("Read",
org.apache.beam.sdk.io.Read.from(getSource()));
+ lines = input.apply("Read",
org.apache.beam.sdk.io.Read.from(getSource()));
+ } else {
+ // All other cases go through FileIO + ReadFiles
+ lines =
+ input
+ .apply(
+ "Create filepattern", Create.ofProvider(getFilepattern(),
StringUtf8Coder.of()))
+ .apply("Match All",
FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+ .apply(
+ "Read Matches",
+ FileIO.readMatches()
+ .withCompression(getCompression())
+ .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+ .apply("Via ReadFiles",
readFiles().withDelimiter(getDelimiter()));
}
- // All other cases go through FileIO + ReadFiles
- return input
- .apply("Create filepattern", Create.ofProvider(getFilepattern(),
StringUtf8Coder.of()))
- .apply("Match All",
FileIO.matchAll().withConfiguration(getMatchConfiguration()))
- .apply(
- "Read Matches",
- FileIO.readMatches()
- .withCompression(getCompression())
- .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
- .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+ // Check if the user decided to opt out of recordNums associated with
records
+ if (getWithoutLineNumMetadata()) {
+ return lines;
+ }
+
+ // At this point the line number in RecordWithMetadata contains the
relative line offset from
+ // the
+ // beginning of the read range.
+
+ // To compute the absolute position from the beginning of the input,
+ // we group the lines within the same ranges, and evaluate the size of
each range.
+
+ PCollection<KV<KV<String, Long>, RecordWithMetadata>>
linesGroupedByFileAndRange =
+ lines.apply("AddFileNameAndRange", ParDo.of(new
AddFileNameAndRange()));
+
+ PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+ linesGroupedByFileAndRange
+ .apply("CountLinesForEachFileRange", Count.perKey())
+ .apply("SizesAsView", View.asMap());
+
+ // Get Pipeline to create a dummy PCollection with one element to help
compute the lines
+ // before each Range
+ PCollection<Integer> singletonPcoll =
+ input.getPipeline().apply("CreateSingletonPcoll",
Create.of(Arrays.asList(1)));
+
+ // For each (File, Offset) pair, calculate the number of lines occurring
before the Range for
+ // each File
+
+ // After computing the number of lines before each range, we can find
the line number in
+ // original file as numLiesBeforeOffset + lineNumInCurrentOffset
+ PCollectionView<Map<KV<String, Long>, Long>> numLinesBeforeEachRange =
+ singletonPcoll
+ .apply(
+ "ComputeLinesBeforeRange",
+ ParDo.of(new
ComputeLinesBeforeEachRange(rangeSizes)).withSideInputs(rangeSizes))
+ .apply("NumLinesBeforeEachRangeAsView", View.asMap());
+
+ return linesGroupedByFileAndRange.apply(
+ "AssignLineNums",
+ ParDo.of(new AssignLineNums(numLinesBeforeEachRange))
+ .withSideInputs(numLinesBeforeEachRange));
+ }
+
+ @VisibleForTesting
+ static class AddFileNameAndRange
+ extends DoFn<RecordWithMetadata, KV<KV<String, Long>,
RecordWithMetadata>> {
+ @ProcessElement
+ public void processElement(
+ @Element RecordWithMetadata line,
+ OutputReceiver<KV<KV<String, Long>, RecordWithMetadata>> out) {
+ out.output(KV.of(KV.of(line.getFileName(),
line.getRange().getRangeNum()), line));
+ }
+ }
+
+ /**
+ * Helper class for computing number of lines in the File preceding the
beginning of the Range
+ * in this file.
+ */
+ @VisibleForTesting
+ static class ComputeLinesBeforeEachRange extends DoFn<Integer,
KV<KV<String, Long>, Long>> {
+ private final PCollectionView<Map<KV<String, Long>, Long>> rangeSizes;
+
+ public ComputeLinesBeforeEachRange(PCollectionView<Map<KV<String, Long>,
Long>> rangeSizes) {
+ this.rangeSizes = rangeSizes;
+ }
+
+ // Add custom comparator as KV<K, V> is not comparable by default
+ private static class FileRangeComparator<K extends Comparable<K>, V
extends Comparable<V>>
+ implements Comparator<KV<K, V>> {
+ @Override
+ public int compare(KV<K, V> a, KV<K, V> b) {
+ if (a.getKey().compareTo(b.getKey()) == 0) {
+ return a.getValue().compareTo(b.getValue());
+ }
+ return a.getKey().compareTo(b.getKey());
+ }
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext p) {
+ // Get the Map Containing the size from side-input
+ Map<KV<String, Long>, Long> rangeSizesMap = p.sideInput(rangeSizes);
+
+ // The FileRange Pair must be sorted
+ SortedMap<KV<String, Long>, Long> sorted = new TreeMap<>(new
FileRangeComparator<>());
+
+ // Initialize sorted map with values
+ for (Map.Entry<KV<String, Long>, Long> entry :
rangeSizesMap.entrySet()) {
+ sorted.put(entry.getKey(), entry.getValue());
+ }
+
+ // HashMap that tracks lines passed for each file
+ Map<String, Long> pastLines = new HashMap<>();
+
+ // For each (File, Range) Pair, compute the number of lines before it
+ for (Map.Entry entry : sorted.entrySet()) {
+ Long lines = (long) entry.getValue();
Review comment:
Ack.
##########
File path:
sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Implementation detail of {@link ContextualTextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline
characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n},
{@code \r}, or {@code
+ * \r\n} as the delimiter. This source is not strict and supports decoding the
last record even if
+ * it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the
stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first
delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class ContextualTextIOSource extends FileBasedSource<RecordWithMetadata> {
+ byte[] delimiter;
+
+ // Used to Override isSplittable
+ private boolean hasMultilineCSVRecords;
+
+ @Override
+ protected boolean isSplittable() throws Exception {
+ if (hasMultilineCSVRecords) {
+ return false;
+ }
+ return super.isSplittable();
+ }
+
+ ContextualTextIOSource(
+ ValueProvider<String> fileSpec,
+ EmptyMatchTreatment emptyMatchTreatment,
+ byte[] delimiter,
+ boolean hasMultilineCSVRecords) {
+ super(fileSpec, emptyMatchTreatment, 1L);
+ this.delimiter = delimiter;
+ this.hasMultilineCSVRecords = hasMultilineCSVRecords;
+ }
+
+ private ContextualTextIOSource(
+ MatchResult.Metadata metadata,
+ long start,
+ long end,
+ byte[] delimiter,
+ boolean hasMultilineCSVRecords) {
+ super(metadata, 1L, start, end);
+ this.delimiter = delimiter;
+ this.hasMultilineCSVRecords = hasMultilineCSVRecords;
+ }
+
+ @Override
+ protected FileBasedSource<RecordWithMetadata> createForSubrangeOfFile(
+ MatchResult.Metadata metadata, long start, long end) {
+ return new ContextualTextIOSource(metadata, start, end, delimiter,
hasMultilineCSVRecords);
+ }
+
+ @Override
+ protected FileBasedReader<RecordWithMetadata>
createSingleFileReader(PipelineOptions options) {
+ return new MultiLineTextBasedReader(this, delimiter,
hasMultilineCSVRecords);
+ }
+
+ @Override
+ public Coder<RecordWithMetadata> getOutputCoder() {
+ SchemaCoder<RecordWithMetadata> coder = null;
+ try {
+ coder =
SchemaRegistry.createDefault().getSchemaCoder(RecordWithMetadata.class);
+ } catch (NoSuchSchemaException e) {
+ System.out.println("No Coder!");
Review comment:
Ack.
##########
File path:
sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -99,14 +126,23 @@ private ContextualTextIOSource(MatchResult.Metadata
metadata, long start, long e
private volatile long startOfNextRecord;
private volatile boolean eof;
private volatile boolean elementIsPresent;
- private @Nullable String currentValue;
+ private @Nullable RecordWithMetadata currentValue;
private @Nullable ReadableByteChannel inChannel;
private byte @Nullable [] delimiter;
- private TextBasedReader(ContextualTextIOSource source, byte[] delimiter) {
+ // Add to override the isSplittable
+ private boolean hasRFC4180MultiLineColumn;
+
+ private long startingOffset;
+ private long readerlineNum;
+
+ private MultiLineTextBasedReader(
+ ContextualTextIOSource source, byte[] delimiter, boolean
hasRFC4180MultiLineColumn) {
Review comment:
Ack.
##########
File path:
sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -259,7 +317,31 @@ private void decodeCurrentElement() throws IOException {
if (startOfRecord == 0 && dataToDecode.startsWith(UTF8_BOM)) {
dataToDecode = dataToDecode.substring(UTF8_BOM.size());
}
- currentValue = dataToDecode.toStringUtf8();
+
+ /////////////////////////////////////////////
+
+ // Data of the Current Line
+ // dataToDecode.toStringUtf8();
+
+ // The line num is:
Review comment:
Removed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]