lukecwik commented on a change in pull request #12924:
URL: https://github.com/apache/beam/pull/12924#discussion_r498425569
##########
File path:
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -637,4 +619,68 @@ private CreateTextSourceFn(byte[] delimiter, boolean
hasMultilineCSVRecords) {
/** Disable construction of utility class. */
private ContextualTextIO() {}
+
+ private static class ProcessRecordNumbers extends
PTransform<PCollection<Row>, PCollection<Row>> {
+
+ @Override
+ public PCollection<Row> expand(PCollection<Row> records) {
+ /*
+ * At this point the line number in RecordWithMetadata contains the
relative line offset from the beginning of the read range.
+ *
+ * To compute the absolute position from the beginning of the input we
group the lines within the same ranges, and evaluate the size of each range.
+ */
+
+ // This algorithm only works with triggers that fire once, for now only
default trigger is
+ // supported.
+ Trigger currentTrigger = records.getWindowingStrategy().getTrigger();
+
+ Set<Trigger> allowedTriggers =
+ ImmutableSet.of(
+ Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
DefaultTrigger.of());
+
+ Preconditions.checkArgument(
+ allowedTriggers.contains(currentTrigger),
+ String.format(
+ "getWithRecordNumMetadata only support the default trigger not.
%s", currentTrigger));
Review comment:
```suggestion
"getWithRecordNumMetadata(true) only supports the default
trigger not: %s", currentTrigger));
```
##########
File path:
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -167,26 +177,26 @@
* .apply(ContextualTextIO.readFiles());
* }</pre>
*
- * <p>Example 6: reading without recordNum metadata, or only fileName
associated Metadata. (the
- * Objects would still contain recordNums, but these recordNums would
correspond to their positions
- * in their respective offsets rather than their positions within the entire
file).
+ * <p>Example 6: reading with recordNum metadata. (the Objects still contain
recordNums, but these
+ * recordNums would correspond to their positions in their respective offsets
rather than their
+ * positions within the entire file).
*
* <pre>{@code
* Pipeline p = ...;
*
* PCollection<Row> records = p.apply(ContextualTextIO.read()
* .from("/local/path/to/files/*.csv")
- * .setWithoutRecordNumMetadata(true));
+ * .setWithRecordNumMetadata(true));
* }</pre>
*
* <p>NOTE: When using {@link
ContextualTextIO.Read#withHasMultilineCSVRecords(Boolean)} this
* option, a single reader will be used to process the file, rather than
multiple readers which can
* read from different offsets. For a large file this can result in lower
performance.
*
- * <p>NOTE: Use {@link Read#withoutRecordNumMetadata()} when recordNum
metadata is not required, for
- * example, when when only filename metadata is required. Computing record
positions currently
- * introduces a shuffle step, which increases the resources used by the
pipeline. <b> By default
- * withoutRecordNumMetadata is set to false, so the shuffle step is
performed.</b>
+ * <p>NOTE: Use {@link Read#withRecordNumMetadata()} when recordNum metadata
is required. Computing
+ * record positions currently introduces a shuffle step, which increases the
resources used by the
Review comment:
```suggestion
* record positions currently introduces a grouping step, which increases
the resources used by the
```
##########
File path:
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -637,4 +619,68 @@ private CreateTextSourceFn(byte[] delimiter, boolean
hasMultilineCSVRecords) {
/** Disable construction of utility class. */
private ContextualTextIO() {}
+
+ private static class ProcessRecordNumbers extends
PTransform<PCollection<Row>, PCollection<Row>> {
+
+ @Override
+ public PCollection<Row> expand(PCollection<Row> records) {
+ /*
+ * At this point the line number in RecordWithMetadata contains the
relative line offset from the beginning of the read range.
+ *
+ * To compute the absolute position from the beginning of the input we
group the lines within the same ranges, and evaluate the size of each range.
+ */
+
+ // This algorithm only works with triggers that fire once, for now only
default trigger is
+ // supported.
+ Trigger currentTrigger = records.getWindowingStrategy().getTrigger();
+
+ Set<Trigger> allowedTriggers =
+ ImmutableSet.of(
+ Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
DefaultTrigger.of());
+
+ Preconditions.checkArgument(
+ allowedTriggers.contains(currentTrigger),
+ String.format(
+ "getWithRecordNumMetadata only support the default trigger not.
%s", currentTrigger));
+
+ PCollection<KV<KV<String, Long>, Row>> recordsGroupedByFileAndRange =
+ records
+ .apply("AddFileNameAndRange", ParDo.of(new
AddFileNameAndRange()))
+ .setCoder(
+ KvCoder.of(
+ KvCoder.of(StringUtf8Coder.of(),
BigEndianLongCoder.of()),
+ RowCoder.of(RecordWithMetadata.getSchema())));
+
+ PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+ recordsGroupedByFileAndRange
+ .apply("CountRecordsForEachFileRange", Count.perKey())
+ .apply("SizesAsView", View.asMap());
+
+ // Get Pipeline to create a dummy PCollection with one element to help
compute the lines
+ // before each Range
+ PCollection<Integer> singletonPcoll =
+ records.getPipeline().apply("CreateSingletonPcoll",
Create.of(Arrays.asList(1)));
+
+ /*
+ * For each (File, Offset) pair, calculate the number of lines occurring
before the Range for each file
+ *
+ * After computing the number of lines before each range, we can find
the line number in original file as numLiesBeforeOffset + lineNumInCurrentOffset
Review comment:
```suggestion
* After computing the number of lines before each range, we can find
the line number in original file as numLinesBeforeOffset +
lineNumInCurrentOffset
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]