[
https://issues.apache.org/jira/browse/BEAM-10124?focusedWorklogId=493625&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-493625
]
ASF GitHub Bot logged work on BEAM-10124:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 01/Oct/20 18:07
Start Date: 01/Oct/20 18:07
Worklog Time Spent: 10m
Work Description: lukecwik commented on a change in pull request #12924:
URL: https://github.com/apache/beam/pull/12924#discussion_r498425569
##########
File path:
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -637,4 +619,68 @@ private CreateTextSourceFn(byte[] delimiter, boolean
hasMultilineCSVRecords) {
/** Disable construction of utility class. */
private ContextualTextIO() {}
+
+ private static class ProcessRecordNumbers extends
PTransform<PCollection<Row>, PCollection<Row>> {
+
+ @Override
+ public PCollection<Row> expand(PCollection<Row> records) {
+ /*
+ * At this point the line number in RecordWithMetadata contains the
relative line offset from the beginning of the read range.
+ *
+ * To compute the absolute position from the beginning of the input we
group the lines within the same ranges, and evaluate the size of each range.
+ */
+
+ // This algorithm only works with triggers that fire once, for now only
default trigger is
+ // supported.
+ Trigger currentTrigger = records.getWindowingStrategy().getTrigger();
+
+ Set<Trigger> allowedTriggers =
+ ImmutableSet.of(
+ Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
DefaultTrigger.of());
+
+ Preconditions.checkArgument(
+ allowedTriggers.contains(currentTrigger),
+ String.format(
+ "getWithRecordNumMetadata only support the default trigger not.
%s", currentTrigger));
Review comment:
```suggestion
"getWithRecordNumMetadata(true) only supports the default
trigger not: %s", currentTrigger));
```
##########
File path:
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -167,26 +177,26 @@
* .apply(ContextualTextIO.readFiles());
* }</pre>
*
- * <p>Example 6: reading without recordNum metadata, or only fileName
associated Metadata. (the
- * Objects would still contain recordNums, but these recordNums would
correspond to their positions
- * in their respective offsets rather than their positions within the entire
file).
+ * <p>Example 6: reading with recordNum metadata. (the Objects still contain
recordNums, but these
+ * recordNums would correspond to their positions in their respective offsets
rather than their
+ * positions within the entire file).
*
* <pre>{@code
* Pipeline p = ...;
*
* PCollection<Row> records = p.apply(ContextualTextIO.read()
* .from("/local/path/to/files/*.csv")
- * .setWithoutRecordNumMetadata(true));
+ * .setWithRecordNumMetadata(true));
* }</pre>
*
* <p>NOTE: When using {@link
ContextualTextIO.Read#withHasMultilineCSVRecords(Boolean)} this
* option, a single reader will be used to process the file, rather than
multiple readers which can
* read from different offsets. For a large file this can result in lower
performance.
*
- * <p>NOTE: Use {@link Read#withoutRecordNumMetadata()} when recordNum
metadata is not required, for
- * example, when when only filename metadata is required. Computing record
positions currently
- * introduces a shuffle step, which increases the resources used by the
pipeline. <b> By default
- * withoutRecordNumMetadata is set to false, so the shuffle step is
performed.</b>
+ * <p>NOTE: Use {@link Read#withRecordNumMetadata()} when recordNum metadata
is required. Computing
+ * record positions currently introduces a shuffle step, which increases the
resources used by the
Review comment:
```suggestion
* record positions currently introduces a grouping step, which increases
the resources used by the
```
##########
File path:
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -637,4 +619,68 @@ private CreateTextSourceFn(byte[] delimiter, boolean
hasMultilineCSVRecords) {
/** Disable construction of utility class. */
private ContextualTextIO() {}
+
+ private static class ProcessRecordNumbers extends
PTransform<PCollection<Row>, PCollection<Row>> {
+
+ @Override
+ public PCollection<Row> expand(PCollection<Row> records) {
+ /*
+ * At this point the line number in RecordWithMetadata contains the
relative line offset from the beginning of the read range.
+ *
+ * To compute the absolute position from the beginning of the input we
group the lines within the same ranges, and evaluate the size of each range.
+ */
+
+ // This algorithm only works with triggers that fire once, for now only
default trigger is
+ // supported.
+ Trigger currentTrigger = records.getWindowingStrategy().getTrigger();
+
+ Set<Trigger> allowedTriggers =
+ ImmutableSet.of(
+ Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
DefaultTrigger.of());
+
+ Preconditions.checkArgument(
+ allowedTriggers.contains(currentTrigger),
+ String.format(
+ "getWithRecordNumMetadata only support the default trigger not.
%s", currentTrigger));
+
+ PCollection<KV<KV<String, Long>, Row>> recordsGroupedByFileAndRange =
+ records
+ .apply("AddFileNameAndRange", ParDo.of(new
AddFileNameAndRange()))
+ .setCoder(
+ KvCoder.of(
+ KvCoder.of(StringUtf8Coder.of(),
BigEndianLongCoder.of()),
+ RowCoder.of(RecordWithMetadata.getSchema())));
+
+ PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+ recordsGroupedByFileAndRange
+ .apply("CountRecordsForEachFileRange", Count.perKey())
+ .apply("SizesAsView", View.asMap());
+
+ // Get Pipeline to create a dummy PCollection with one element to help
compute the lines
+ // before each Range
+ PCollection<Integer> singletonPcoll =
+ records.getPipeline().apply("CreateSingletonPcoll",
Create.of(Arrays.asList(1)));
+
+ /*
+ * For each (File, Offset) pair, calculate the number of lines occurring
before the Range for each file
+ *
+ * After computing the number of lines before each range, we can find
the line number in original file as numLiesBeforeOffset + lineNumInCurrentOffset
Review comment:
```suggestion
* After computing the number of lines before each range, we can find
the line number in original file as numLinesBeforeOffset +
lineNumInCurrentOffset
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 493625)
Remaining Estimate: 1,322.5h (was: 1,322h 40m)
Time Spent: 21.5h (was: 21h 20m)
> ContextualTextIO - An IO that is provides metadata about the line.
> -------------------------------------------------------------------
>
> Key: BEAM-10124
> URL: https://issues.apache.org/jira/browse/BEAM-10124
> Project: Beam
> Issue Type: New Feature
> Components: io-ideas
> Reporter: Reza ardeshir rokni
> Assignee: Reza ardeshir rokni
> Priority: P0
> Fix For: 2.26.0
>
> Original Estimate: 1,344h
> Time Spent: 21.5h
> Remaining Estimate: 1,322.5h
>
> There are two important Source IO's that allow for dealing with text files
> FileIO and TextIO. When the requirements go beyond the scope of TextIO we ask
> the end user to make use of FileIO and go it on their own.
> We want to correct this experience by creating a more feature rich TextIO
> which can return along with each line of data contextual information about
> the line. For example the file that it came from, and the ordinal position of
> the line within the file.
> Another area that we would like to deal with is more formal support for CSV
> files by allowing compliance to RFC4180 files. Specifically the RFC allows
> for line breaks (CRLF) to be used within fields within double quotes.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)