[ 
https://issues.apache.org/jira/browse/BEAM-10124?focusedWorklogId=507476&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-507476
 ]

ASF GitHub Bot logged work on BEAM-10124:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Nov/20 03:47
            Start Date: 04/Nov/20 03:47
    Worklog Time Spent: 10m 
      Work Description: rezarokni commented on a change in pull request #13247:
URL: https://github.com/apache/beam/pull/13247#discussion_r517081519



##########
File path: 
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -423,90 +422,91 @@ public void processElement(
     }
 
     /**
-     * Helper class for computing number of record in the File preceding the 
beginning of the Range
-     * in this file.
+     * Helper class for computing number of records in the File preceding the 
beginning of each read
+     * range (offset) in this file.
      */
     @VisibleForTesting
-    static class ComputeRecordsBeforeEachRange extends DoFn<Integer, 
KV<KV<String, Long>, Long>> {
+    static class ComputeRecordsBeforeEachRange extends DoFn<Integer, 
KV<String, Map<Long, Long>>> {
       private final PCollectionView<Map<String, Iterable<KV<Long, Long>>>> 
rangeSizes;
 
       public ComputeRecordsBeforeEachRange(
           PCollectionView<Map<String, Iterable<KV<Long, Long>>>> rangeSizes) {
         this.rangeSizes = rangeSizes;
       }
 
-      // Add custom comparator as KV<K, V> is not comparable by default
-      private static class FileRangeComparator<K extends Comparable<K>, V 
extends Comparable<V>>
-          implements Comparator<KV<K, V>>, Serializable {
-        @Override
-        public int compare(KV<K, V> a, KV<K, V> b) {
-          if (a.getKey().compareTo(b.getKey()) == 0) {
-            return a.getValue().compareTo(b.getValue());
-          }
-          return a.getKey().compareTo(b.getKey());
-        }
-      }
-
       @ProcessElement
       public void processElement(ProcessContext p) {
-        // Process each file from which is a key from the side input
-
-        // Get the Map Containing the size from side-input
+        // Get the multimap side input containing the size of each read range.
         Map<String, Iterable<KV<Long, Long>>> rangeSizesMap = 
p.sideInput(rangeSizes);
 
+        // Process each file, retrieving each filename as key from the side 
input.
         for (Entry<String, Iterable<KV<Long, Long>>> entrySet : 
rangeSizesMap.entrySet()) {
-          // The FileRange Pair must be sorted
-          SortedMap<KV<String, Long>, Long> sorted = new TreeMap<>(new 
FileRangeComparator<>());
+          // The offsets in the file must be sorted.
+          SortedMap<Long, Long> sorted = new TreeMap<>();
 
           entrySet
               .getValue()
               .iterator()
-              .forEachRemaining(
-                  x -> sorted.put(KV.of(entrySet.getKey(), x.getKey()), 
x.getValue()));
-
-          // HashMap that tracks number of records passed for each file
-          Map<String, Long> pastRecords = new HashMap<>();
-
-          // For each (File, Range) Pair, compute the number of records before 
it
-          for (Map.Entry<KV<String, Long>, Long> entry : sorted.entrySet()) {
-            Long numRecords = entry.getValue();
-            KV<String, Long> fileRange = entry.getKey();
-            String file = fileRange.getKey();
-            Long numRecordsBefore = 0L;
-            if (pastRecords.containsKey(file)) {
-              numRecordsBefore = pastRecords.get(file);
-            }
-            p.output(KV.of(fileRange, numRecordsBefore));
-            pastRecords.put(file, numRecordsBefore + numRecords);
+              .forEachRemaining(x -> sorted.put(x.getKey(), x.getValue()));
+
+          String file = entrySet.getKey();
+          Map<Long, Long> numRecordsBeforeEachOffset = new HashMap<>();
+          Long numRecordsBefore = 0L;
+          for (Map.Entry<Long, Long> entry : sorted.entrySet()) {
+            Long offset = entry.getKey();
+            Long numRecordsInRangeStartingAtThisOffset = entry.getValue();
+            numRecordsBeforeEachOffset.put(offset, numRecordsBefore);
+            numRecordsBefore += numRecordsInRangeStartingAtThisOffset;
           }
+          p.output(KV.of(file, numRecordsBeforeEachOffset));
         }
       }
     }
 
+    /**
+     * Helper transform for computing absolute position of each record given 
the read range of each
+     * record and a side input describing the describing number of records 
that precede the
+     * beginning of each read range.
+     */
     static class AssignRecordNums extends DoFn<KV<KV<String, Long>, Row>, Row> 
{

Review comment:
       Can we leave a TODO for a performance test to be added as part of the 
performance suite?
   

##########
File path: 
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -423,90 +422,91 @@ public void processElement(
     }
 
     /**
-     * Helper class for computing number of record in the File preceding the 
beginning of the Range
-     * in this file.
+     * Helper class for computing number of records in the File preceding the 
beginning of each read
+     * range (offset) in this file.
      */
     @VisibleForTesting
-    static class ComputeRecordsBeforeEachRange extends DoFn<Integer, 
KV<KV<String, Long>, Long>> {
+    static class ComputeRecordsBeforeEachRange extends DoFn<Integer, 
KV<String, Map<Long, Long>>> {
       private final PCollectionView<Map<String, Iterable<KV<Long, Long>>>> 
rangeSizes;
 
       public ComputeRecordsBeforeEachRange(
           PCollectionView<Map<String, Iterable<KV<Long, Long>>>> rangeSizes) {
         this.rangeSizes = rangeSizes;
       }
 
-      // Add custom comparator as KV<K, V> is not comparable by default
-      private static class FileRangeComparator<K extends Comparable<K>, V 
extends Comparable<V>>
-          implements Comparator<KV<K, V>>, Serializable {
-        @Override
-        public int compare(KV<K, V> a, KV<K, V> b) {
-          if (a.getKey().compareTo(b.getKey()) == 0) {
-            return a.getValue().compareTo(b.getValue());
-          }
-          return a.getKey().compareTo(b.getKey());
-        }
-      }
-
       @ProcessElement
       public void processElement(ProcessContext p) {
-        // Process each file from which is a key from the side input
-
-        // Get the Map Containing the size from side-input
+        // Get the multimap side input containing the size of each read range.
         Map<String, Iterable<KV<Long, Long>>> rangeSizesMap = 
p.sideInput(rangeSizes);
 
+        // Process each file, retrieving each filename as key from the side 
input.
         for (Entry<String, Iterable<KV<Long, Long>>> entrySet : 
rangeSizesMap.entrySet()) {
-          // The FileRange Pair must be sorted
-          SortedMap<KV<String, Long>, Long> sorted = new TreeMap<>(new 
FileRangeComparator<>());
+          // The offsets in the file must be sorted.
+          SortedMap<Long, Long> sorted = new TreeMap<>();
 
           entrySet
               .getValue()
               .iterator()
-              .forEachRemaining(
-                  x -> sorted.put(KV.of(entrySet.getKey(), x.getKey()), 
x.getValue()));
-
-          // HashMap that tracks number of records passed for each file
-          Map<String, Long> pastRecords = new HashMap<>();
-
-          // For each (File, Range) Pair, compute the number of records before 
it
-          for (Map.Entry<KV<String, Long>, Long> entry : sorted.entrySet()) {
-            Long numRecords = entry.getValue();
-            KV<String, Long> fileRange = entry.getKey();
-            String file = fileRange.getKey();
-            Long numRecordsBefore = 0L;
-            if (pastRecords.containsKey(file)) {
-              numRecordsBefore = pastRecords.get(file);
-            }
-            p.output(KV.of(fileRange, numRecordsBefore));
-            pastRecords.put(file, numRecordsBefore + numRecords);
+              .forEachRemaining(x -> sorted.put(x.getKey(), x.getValue()));
+
+          String file = entrySet.getKey();
+          Map<Long, Long> numRecordsBeforeEachOffset = new HashMap<>();
+          Long numRecordsBefore = 0L;
+          for (Map.Entry<Long, Long> entry : sorted.entrySet()) {
+            Long offset = entry.getKey();
+            Long numRecordsInRangeStartingAtThisOffset = entry.getValue();
+            numRecordsBeforeEachOffset.put(offset, numRecordsBefore);
+            numRecordsBefore += numRecordsInRangeStartingAtThisOffset;
           }
+          p.output(KV.of(file, numRecordsBeforeEachOffset));
         }
       }
     }
 
+    /**
+     * Helper transform for computing absolute position of each record given 
the read range of each
+     * record and a side input describing the describing number of records 
that precede the
+     * beginning of each read range.
+     */
     static class AssignRecordNums extends DoFn<KV<KV<String, Long>, Row>, Row> 
{
-      PCollectionView<Map<KV<String, Long>, Long>> numRecordsBeforeEachRange;
+      PCollectionView<Map<String, Map<Long, Long>>> numRecordsBeforeEachRange;
 
       public AssignRecordNums(
-          PCollectionView<Map<KV<String, Long>, Long>> 
numRecordsBeforeEachRange) {
+          PCollectionView<Map<String, Map<Long, Long>>> 
numRecordsBeforeEachRange) {
         this.numRecordsBeforeEachRange = numRecordsBeforeEachRange;
       }
 
       @ProcessElement
       public void processElement(ProcessContext p) {
-        Long range = p.element().getKey().getValue();
         String file = p.element().getKey().getKey();
+        Long offset = p.element().getKey().getValue();
         Row record = p.element().getValue();
-        Long numRecordsLessThanThisRange =
-            p.sideInput(numRecordsBeforeEachRange).get(KV.of(file, range));
+
+        Map<Long, Long> numRecordsBeforeEachOffsetInFile =
+            p.sideInput(numRecordsBeforeEachRange).get(file);
+        Long numRecordsLessThanThisOffset = 
numRecordsBeforeEachOffsetInFile.get(offset);
+
         Row newLine =
             Row.fromRow(record)
                 .withFieldValue(
                     RecordWithMetadata.RECORD_NUM,
                     record.getInt64(RecordWithMetadata.RECORD_NUM_IN_OFFSET)
-                        + numRecordsLessThanThisRange)
+                        + numRecordsLessThanThisOffset)
                 .build();
         p.output(newLine);
       }
+
+      private Long getNumRecordsBeforeOffset(
+          Long offset, Iterator<KV<Long, Long>> 
numRecordsBeforeEachOffsetInFile) {
+        while (numRecordsBeforeEachOffsetInFile.hasNext()) {
+          KV<Long, Long> entry = numRecordsBeforeEachOffsetInFile.next();
+          if (entry.getKey().equals(offset)) {
+            return entry.getValue();
+          }
+        }
+        LOG.error("Unable to compute contextual metadata. Please report a bug 
in ContextualTextIO");
+        return null;

Review comment:
       Should we throw an exception here to fail the job ? Might be difficult 
for the user to debug the null in their metadata.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 507476)
    Remaining Estimate: 1,318h 40m  (was: 1,318h 50m)
            Time Spent: 25h 20m  (was: 25h 10m)

> ContextualTextIO - An IO that is provides metadata about the line. 
> -------------------------------------------------------------------
>
>                 Key: BEAM-10124
>                 URL: https://issues.apache.org/jira/browse/BEAM-10124
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-ideas
>            Reporter: Reza ardeshir rokni
>            Assignee: Valentyn Tymofieiev
>            Priority: P0
>             Fix For: 2.26.0
>
>   Original Estimate: 1,344h
>          Time Spent: 25h 20m
>  Remaining Estimate: 1,318h 40m
>
> There are two important Source IO's that allow for dealing with text files 
> FileIO and TextIO. When the requirements go beyond the scope of TextIO we ask 
> the end user to make use of FileIO and go it on their own.
> We want to correct this experience by creating a more feature rich TextIO 
> which can return along with each line of data contextual information about 
> the line. For example the file that it came from, and the ordinal position of 
> the line within the file.
> Another area that we would like to deal with is more formal support for CSV 
> files by allowing compliance to RFC4180 files. Specifically the RFC allows 
> for line breaks (CRLF) to be used within fields within double quotes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to