[ 
https://issues.apache.org/jira/browse/BEAM-10124?focusedWorklogId=493625&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-493625
 ]

ASF GitHub Bot logged work on BEAM-10124:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Oct/20 18:07
            Start Date: 01/Oct/20 18:07
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #12924:
URL: https://github.com/apache/beam/pull/12924#discussion_r498425569



##########
File path: 
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -637,4 +619,68 @@ private CreateTextSourceFn(byte[] delimiter, boolean 
hasMultilineCSVRecords) {
 
   /** Disable construction of utility class. */
   private ContextualTextIO() {}
+
+  private static class ProcessRecordNumbers extends 
PTransform<PCollection<Row>, PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollection<Row> records) {
+      /*
+       * At this point the line number in RecordWithMetadata contains the 
relative line offset from the beginning of the read range.
+       *
+       * To compute the absolute position from the beginning of the input we 
group the lines within the same ranges, and evaluate the size of each range.
+       */
+
+      // This algorithm only works with triggers that fire once, for now only 
default trigger is
+      // supported.
+      Trigger currentTrigger = records.getWindowingStrategy().getTrigger();
+
+      Set<Trigger> allowedTriggers =
+          ImmutableSet.of(
+              Repeatedly.forever(AfterWatermark.pastEndOfWindow()), 
DefaultTrigger.of());
+
+      Preconditions.checkArgument(
+          allowedTriggers.contains(currentTrigger),
+          String.format(
+              "getWithRecordNumMetadata only support the default trigger not. 
%s", currentTrigger));

Review comment:
       ```suggestion
                 "getWithRecordNumMetadata(true) only supports the default 
trigger not: %s", currentTrigger));
   ```

##########
File path: 
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -167,26 +177,26 @@
  *      .apply(ContextualTextIO.readFiles());
  * }</pre>
  *
- * <p>Example 6: reading without recordNum metadata, or only fileName 
associated Metadata. (the
- * Objects would still contain recordNums, but these recordNums would 
correspond to their positions
- * in their respective offsets rather than their positions within the entire 
file).
+ * <p>Example 6: reading with recordNum metadata. (the Objects still contain 
recordNums, but these
+ * recordNums would correspond to their positions in their respective offsets 
rather than their
+ * positions within the entire file).
  *
  * <pre>{@code
  * Pipeline p = ...;
  *
  * PCollection<Row> records = p.apply(ContextualTextIO.read()
  *     .from("/local/path/to/files/*.csv")
- *      .setWithoutRecordNumMetadata(true));
+ *      .setWithRecordNumMetadata(true));
  * }</pre>
  *
  * <p>NOTE: When using {@link 
ContextualTextIO.Read#withHasMultilineCSVRecords(Boolean)} this
  * option, a single reader will be used to process the file, rather than 
multiple readers which can
  * read from different offsets. For a large file this can result in lower 
performance.
  *
- * <p>NOTE: Use {@link Read#withoutRecordNumMetadata()} when recordNum 
metadata is not required, for
- * example, when when only filename metadata is required. Computing record 
positions currently
- * introduces a shuffle step, which increases the resources used by the 
pipeline. <b> By default
- * withoutRecordNumMetadata is set to false, so the shuffle step is 
performed.</b>
+ * <p>NOTE: Use {@link Read#withRecordNumMetadata()} when recordNum metadata 
is required. Computing
+ * record positions currently introduces a shuffle step, which increases the 
resources used by the

Review comment:
       ```suggestion
    * record positions currently introduces a grouping step, which increases 
the resources used by the
   ```

##########
File path: 
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -637,4 +619,68 @@ private CreateTextSourceFn(byte[] delimiter, boolean 
hasMultilineCSVRecords) {
 
   /** Disable construction of utility class. */
   private ContextualTextIO() {}
+
+  private static class ProcessRecordNumbers extends 
PTransform<PCollection<Row>, PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollection<Row> records) {
+      /*
+       * At this point the line number in RecordWithMetadata contains the 
relative line offset from the beginning of the read range.
+       *
+       * To compute the absolute position from the beginning of the input we 
group the lines within the same ranges, and evaluate the size of each range.
+       */
+
+      // This algorithm only works with triggers that fire once, for now only 
default trigger is
+      // supported.
+      Trigger currentTrigger = records.getWindowingStrategy().getTrigger();
+
+      Set<Trigger> allowedTriggers =
+          ImmutableSet.of(
+              Repeatedly.forever(AfterWatermark.pastEndOfWindow()), 
DefaultTrigger.of());
+
+      Preconditions.checkArgument(
+          allowedTriggers.contains(currentTrigger),
+          String.format(
+              "getWithRecordNumMetadata only support the default trigger not. 
%s", currentTrigger));
+
+      PCollection<KV<KV<String, Long>, Row>> recordsGroupedByFileAndRange =
+          records
+              .apply("AddFileNameAndRange", ParDo.of(new 
AddFileNameAndRange()))
+              .setCoder(
+                  KvCoder.of(
+                      KvCoder.of(StringUtf8Coder.of(), 
BigEndianLongCoder.of()),
+                      RowCoder.of(RecordWithMetadata.getSchema())));
+
+      PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+          recordsGroupedByFileAndRange
+              .apply("CountRecordsForEachFileRange", Count.perKey())
+              .apply("SizesAsView", View.asMap());
+
+      // Get Pipeline to create a dummy PCollection with one element to help 
compute the lines
+      // before each Range
+      PCollection<Integer> singletonPcoll =
+          records.getPipeline().apply("CreateSingletonPcoll", 
Create.of(Arrays.asList(1)));
+
+      /*
+       * For each (File, Offset) pair, calculate the number of lines occurring 
before the Range for each file
+       *
+       * After computing the number of lines before each range, we can find 
the line number in original file as numLiesBeforeOffset + lineNumInCurrentOffset

Review comment:
       ```suggestion
          * After computing the number of lines before each range, we can find 
the line number in original file as numLinesBeforeOffset + 
lineNumInCurrentOffset
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 493625)
    Remaining Estimate: 1,322.5h  (was: 1,322h 40m)
            Time Spent: 21.5h  (was: 21h 20m)

> ContextualTextIO - An IO that is provides metadata about the line. 
> -------------------------------------------------------------------
>
>                 Key: BEAM-10124
>                 URL: https://issues.apache.org/jira/browse/BEAM-10124
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-ideas
>            Reporter: Reza ardeshir rokni
>            Assignee: Reza ardeshir rokni
>            Priority: P0
>             Fix For: 2.26.0
>
>   Original Estimate: 1,344h
>          Time Spent: 21.5h
>  Remaining Estimate: 1,322.5h
>
> There are two important Source IO's that allow for dealing with text files 
> FileIO and TextIO. When the requirements go beyond the scope of TextIO we ask 
> the end user to make use of FileIO and go it on their own.
> We want to correct this experience by creating a more feature rich TextIO 
> which can return along with each line of data contextual information about 
> the line. For example the file that it came from, and the ordinal position of 
> the line within the file.
> Another area that we would like to deal with is more formal support for CSV 
> files by allowing compliance to RFC4180 files. Specifically the RFC allows 
> for line breaks (CRLF) to be used within fields within double quotes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to