abhiy13 commented on a change in pull request #12645:
URL: https://github.com/apache/beam/pull/12645#discussion_r474445924
##########
File path:
sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
}
@Override
- public PCollection<String> expand(PBegin input) {
- checkNotNull(getFilepattern(), "need to set the filepattern of a
TextIO.Read transform");
+ public PCollection<RecordWithMetadata> expand(PBegin input) {
+ checkNotNull(
+ getFilepattern(), "need to set the filepattern of a
ContextualTextIO.Read transform");
+ PCollection<RecordWithMetadata> lines = null;
if (getMatchConfiguration().getWatchInterval() == null &&
!getHintMatchesManyFiles()) {
- return input.apply("Read",
org.apache.beam.sdk.io.Read.from(getSource()));
+ lines = input.apply("Read",
org.apache.beam.sdk.io.Read.from(getSource()));
+ } else {
+ // All other cases go through FileIO + ReadFiles
+ lines =
+ input
+ .apply(
+ "Create filepattern", Create.ofProvider(getFilepattern(),
StringUtf8Coder.of()))
+ .apply("Match All",
FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+ .apply(
+ "Read Matches",
+ FileIO.readMatches()
+ .withCompression(getCompression())
+ .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+ .apply("Via ReadFiles",
readFiles().withDelimiter(getDelimiter()));
}
- // All other cases go through FileIO + ReadFiles
- return input
- .apply("Create filepattern", Create.ofProvider(getFilepattern(),
StringUtf8Coder.of()))
- .apply("Match All",
FileIO.matchAll().withConfiguration(getMatchConfiguration()))
- .apply(
- "Read Matches",
- FileIO.readMatches()
- .withCompression(getCompression())
- .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
- .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+ // Check if the user decided to opt out of recordNums associated with
records
+ if (getWithoutLineNumMetadata()) {
+ return lines;
+ }
+
+ // At this point the line number in RecordWithMetadata contains the
relative line offset from
+ // the
+ // beginning of the read range.
+
+ // To compute the absolute position from the beginning of the input,
+ // we group the lines within the same ranges, and evaluate the size of
each range.
+
+ PCollection<KV<KV<String, Long>, RecordWithMetadata>>
linesGroupedByFileAndRange =
+ lines.apply("AddFileNameAndRange", ParDo.of(new
AddFileNameAndRange()));
+
+ PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+ linesGroupedByFileAndRange
+ .apply("CountLinesForEachFileRange", Count.perKey())
+ .apply("SizesAsView", View.asMap());
+
+ // Get Pipeline to create a dummy PCollection with one element to help
compute the lines
+ // before each Range
+ PCollection<Integer> singletonPcoll =
+ input.getPipeline().apply("CreateSingletonPcoll",
Create.of(Arrays.asList(1)));
+
+ // For each (File, Offset) pair, calculate the number of lines occurring
before the Range for
Review comment:
Ack.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]