[ 
https://issues.apache.org/jira/browse/BEAM-10124?focusedWorklogId=473230&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-473230
 ]

ASF GitHub Bot logged work on BEAM-10124:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Aug/20 06:59
            Start Date: 21/Aug/20 06:59
    Worklog Time Spent: 10m 
      Work Description: abhiy13 commented on a change in pull request #12645:
URL: https://github.com/apache/beam/pull/12645#discussion_r474445924



##########
File path: 
sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
     }
 
     @Override
-    public PCollection<String> expand(PBegin input) {
-      checkNotNull(getFilepattern(), "need to set the filepattern of a 
TextIO.Read transform");
+    public PCollection<RecordWithMetadata> expand(PBegin input) {
+      checkNotNull(
+          getFilepattern(), "need to set the filepattern of a 
ContextualTextIO.Read transform");
+      PCollection<RecordWithMetadata> lines = null;
       if (getMatchConfiguration().getWatchInterval() == null && 
!getHintMatchesManyFiles()) {
-        return input.apply("Read", 
org.apache.beam.sdk.io.Read.from(getSource()));
+        lines = input.apply("Read", 
org.apache.beam.sdk.io.Read.from(getSource()));
+      } else {
+        // All other cases go through FileIO + ReadFiles
+        lines =
+            input
+                .apply(
+                    "Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
+                .apply("Match All", 
FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+                .apply(
+                    "Read Matches",
+                    FileIO.readMatches()
+                        .withCompression(getCompression())
+                        .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+                .apply("Via ReadFiles", 
readFiles().withDelimiter(getDelimiter()));
       }
 
-      // All other cases go through FileIO + ReadFiles
-      return input
-          .apply("Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
-          .apply("Match All", 
FileIO.matchAll().withConfiguration(getMatchConfiguration()))
-          .apply(
-              "Read Matches",
-              FileIO.readMatches()
-                  .withCompression(getCompression())
-                  .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+      // Check if the user decided to opt out of recordNums associated with 
records
+      if (getWithoutLineNumMetadata()) {
+        return lines;
+      }
+
+      // At this point the line number in RecordWithMetadata contains the 
relative line offset from
+      // the
+      // beginning of the read range.
+
+      // To compute the absolute position from the beginning of the input,
+      // we group the lines within the same ranges, and evaluate the size of 
each range.
+
+      PCollection<KV<KV<String, Long>, RecordWithMetadata>> 
linesGroupedByFileAndRange =
+          lines.apply("AddFileNameAndRange", ParDo.of(new 
AddFileNameAndRange()));
+
+      PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+          linesGroupedByFileAndRange
+              .apply("CountLinesForEachFileRange", Count.perKey())
+              .apply("SizesAsView", View.asMap());
+
+      // Get Pipeline to create a dummy PCollection with one element to help 
compute the lines
+      // before each Range
+      PCollection<Integer> singletonPcoll =
+          input.getPipeline().apply("CreateSingletonPcoll", 
Create.of(Arrays.asList(1)));
+
+      // For each (File, Offset) pair, calculate the number of lines occurring 
before the Range for

Review comment:
       Ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 473230)
    Remaining Estimate: 1,336h 10m  (was: 1,336h 20m)
            Time Spent: 7h 50m  (was: 7h 40m)

> ContextualTextIO - An IO that is provides metadata about the line. 
> -------------------------------------------------------------------
>
>                 Key: BEAM-10124
>                 URL: https://issues.apache.org/jira/browse/BEAM-10124
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-ideas
>            Reporter: Reza ardeshir rokni
>            Priority: P2
>   Original Estimate: 1,344h
>          Time Spent: 7h 50m
>  Remaining Estimate: 1,336h 10m
>
> There are two important Source IO's that allow for dealing with text files 
> FileIO and TextIO. When the requirements go beyond the scope of TextIO we ask 
> the end user to make use of FileIO and go it on their own.
> We want to correct this experience by creating a more feature rich TextIO 
> which can return along with each line of data contextual information about 
> the line. For example the file that it came from, and the ordinal position of 
> the line within the file.
> Another area that we would like to deal with is more formal support for CSV 
> files by allowing compliance to RFC4180 files. Specifically the RFC allows 
> for line breaks (CRLF) to be used within fields within double quotes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to