rezarokni commented on a change in pull request #13247:
URL: https://github.com/apache/beam/pull/13247#discussion_r517081519
##########
File path:
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -423,90 +422,91 @@ public void processElement(
}
/**
- * Helper class for computing number of record in the File preceding the
beginning of the Range
- * in this file.
+ * Helper class for computing number of records in the File preceding the
beginning of each read
+ * range (offset) in this file.
*/
@VisibleForTesting
- static class ComputeRecordsBeforeEachRange extends DoFn<Integer,
KV<KV<String, Long>, Long>> {
+ static class ComputeRecordsBeforeEachRange extends DoFn<Integer,
KV<String, Map<Long, Long>>> {
private final PCollectionView<Map<String, Iterable<KV<Long, Long>>>>
rangeSizes;
public ComputeRecordsBeforeEachRange(
PCollectionView<Map<String, Iterable<KV<Long, Long>>>> rangeSizes) {
this.rangeSizes = rangeSizes;
}
- // Add custom comparator as KV<K, V> is not comparable by default
- private static class FileRangeComparator<K extends Comparable<K>, V
extends Comparable<V>>
- implements Comparator<KV<K, V>>, Serializable {
- @Override
- public int compare(KV<K, V> a, KV<K, V> b) {
- if (a.getKey().compareTo(b.getKey()) == 0) {
- return a.getValue().compareTo(b.getValue());
- }
- return a.getKey().compareTo(b.getKey());
- }
- }
-
@ProcessElement
public void processElement(ProcessContext p) {
- // Process each file from which is a key from the side input
-
- // Get the Map Containing the size from side-input
+ // Get the multimap side input containing the size of each read range.
Map<String, Iterable<KV<Long, Long>>> rangeSizesMap =
p.sideInput(rangeSizes);
+ // Process each file, retrieving each filename as key from the side
input.
for (Entry<String, Iterable<KV<Long, Long>>> entrySet :
rangeSizesMap.entrySet()) {
- // The FileRange Pair must be sorted
- SortedMap<KV<String, Long>, Long> sorted = new TreeMap<>(new
FileRangeComparator<>());
+ // The offsets in the file must be sorted.
+ SortedMap<Long, Long> sorted = new TreeMap<>();
entrySet
.getValue()
.iterator()
- .forEachRemaining(
- x -> sorted.put(KV.of(entrySet.getKey(), x.getKey()),
x.getValue()));
-
- // HashMap that tracks number of records passed for each file
- Map<String, Long> pastRecords = new HashMap<>();
-
- // For each (File, Range) Pair, compute the number of records before
it
- for (Map.Entry<KV<String, Long>, Long> entry : sorted.entrySet()) {
- Long numRecords = entry.getValue();
- KV<String, Long> fileRange = entry.getKey();
- String file = fileRange.getKey();
- Long numRecordsBefore = 0L;
- if (pastRecords.containsKey(file)) {
- numRecordsBefore = pastRecords.get(file);
- }
- p.output(KV.of(fileRange, numRecordsBefore));
- pastRecords.put(file, numRecordsBefore + numRecords);
+ .forEachRemaining(x -> sorted.put(x.getKey(), x.getValue()));
+
+ String file = entrySet.getKey();
+ Map<Long, Long> numRecordsBeforeEachOffset = new HashMap<>();
+ Long numRecordsBefore = 0L;
+ for (Map.Entry<Long, Long> entry : sorted.entrySet()) {
+ Long offset = entry.getKey();
+ Long numRecordsInRangeStartingAtThisOffset = entry.getValue();
+ numRecordsBeforeEachOffset.put(offset, numRecordsBefore);
+ numRecordsBefore += numRecordsInRangeStartingAtThisOffset;
}
+ p.output(KV.of(file, numRecordsBeforeEachOffset));
}
}
}
+ /**
+ * Helper transform for computing absolute position of each record given
the read range of each
+ * record and a side input describing the describing number of records
that precede the
+ * beginning of each read range.
+ */
static class AssignRecordNums extends DoFn<KV<KV<String, Long>, Row>, Row>
{
Review comment:
Can we leave a TODO for a performance test to be added as part of the
performance suite?
##########
File path:
sdks/java/io/contextualtextio/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -423,90 +422,91 @@ public void processElement(
}
/**
- * Helper class for computing number of record in the File preceding the
beginning of the Range
- * in this file.
+ * Helper class for computing number of records in the File preceding the
beginning of each read
+ * range (offset) in this file.
*/
@VisibleForTesting
- static class ComputeRecordsBeforeEachRange extends DoFn<Integer,
KV<KV<String, Long>, Long>> {
+ static class ComputeRecordsBeforeEachRange extends DoFn<Integer,
KV<String, Map<Long, Long>>> {
private final PCollectionView<Map<String, Iterable<KV<Long, Long>>>>
rangeSizes;
public ComputeRecordsBeforeEachRange(
PCollectionView<Map<String, Iterable<KV<Long, Long>>>> rangeSizes) {
this.rangeSizes = rangeSizes;
}
- // Add custom comparator as KV<K, V> is not comparable by default
- private static class FileRangeComparator<K extends Comparable<K>, V
extends Comparable<V>>
- implements Comparator<KV<K, V>>, Serializable {
- @Override
- public int compare(KV<K, V> a, KV<K, V> b) {
- if (a.getKey().compareTo(b.getKey()) == 0) {
- return a.getValue().compareTo(b.getValue());
- }
- return a.getKey().compareTo(b.getKey());
- }
- }
-
@ProcessElement
public void processElement(ProcessContext p) {
- // Process each file from which is a key from the side input
-
- // Get the Map Containing the size from side-input
+ // Get the multimap side input containing the size of each read range.
Map<String, Iterable<KV<Long, Long>>> rangeSizesMap =
p.sideInput(rangeSizes);
+ // Process each file, retrieving each filename as key from the side
input.
for (Entry<String, Iterable<KV<Long, Long>>> entrySet :
rangeSizesMap.entrySet()) {
- // The FileRange Pair must be sorted
- SortedMap<KV<String, Long>, Long> sorted = new TreeMap<>(new
FileRangeComparator<>());
+ // The offsets in the file must be sorted.
+ SortedMap<Long, Long> sorted = new TreeMap<>();
entrySet
.getValue()
.iterator()
- .forEachRemaining(
- x -> sorted.put(KV.of(entrySet.getKey(), x.getKey()),
x.getValue()));
-
- // HashMap that tracks number of records passed for each file
- Map<String, Long> pastRecords = new HashMap<>();
-
- // For each (File, Range) Pair, compute the number of records before
it
- for (Map.Entry<KV<String, Long>, Long> entry : sorted.entrySet()) {
- Long numRecords = entry.getValue();
- KV<String, Long> fileRange = entry.getKey();
- String file = fileRange.getKey();
- Long numRecordsBefore = 0L;
- if (pastRecords.containsKey(file)) {
- numRecordsBefore = pastRecords.get(file);
- }
- p.output(KV.of(fileRange, numRecordsBefore));
- pastRecords.put(file, numRecordsBefore + numRecords);
+ .forEachRemaining(x -> sorted.put(x.getKey(), x.getValue()));
+
+ String file = entrySet.getKey();
+ Map<Long, Long> numRecordsBeforeEachOffset = new HashMap<>();
+ Long numRecordsBefore = 0L;
+ for (Map.Entry<Long, Long> entry : sorted.entrySet()) {
+ Long offset = entry.getKey();
+ Long numRecordsInRangeStartingAtThisOffset = entry.getValue();
+ numRecordsBeforeEachOffset.put(offset, numRecordsBefore);
+ numRecordsBefore += numRecordsInRangeStartingAtThisOffset;
}
+ p.output(KV.of(file, numRecordsBeforeEachOffset));
}
}
}
+ /**
+ * Helper transform for computing absolute position of each record given
the read range of each
+ * record and a side input describing the describing number of records
that precede the
+ * beginning of each read range.
+ */
static class AssignRecordNums extends DoFn<KV<KV<String, Long>, Row>, Row>
{
- PCollectionView<Map<KV<String, Long>, Long>> numRecordsBeforeEachRange;
+ PCollectionView<Map<String, Map<Long, Long>>> numRecordsBeforeEachRange;
public AssignRecordNums(
- PCollectionView<Map<KV<String, Long>, Long>>
numRecordsBeforeEachRange) {
+ PCollectionView<Map<String, Map<Long, Long>>>
numRecordsBeforeEachRange) {
this.numRecordsBeforeEachRange = numRecordsBeforeEachRange;
}
@ProcessElement
public void processElement(ProcessContext p) {
- Long range = p.element().getKey().getValue();
String file = p.element().getKey().getKey();
+ Long offset = p.element().getKey().getValue();
Row record = p.element().getValue();
- Long numRecordsLessThanThisRange =
- p.sideInput(numRecordsBeforeEachRange).get(KV.of(file, range));
+
+ Map<Long, Long> numRecordsBeforeEachOffsetInFile =
+ p.sideInput(numRecordsBeforeEachRange).get(file);
+ Long numRecordsLessThanThisOffset =
numRecordsBeforeEachOffsetInFile.get(offset);
+
Row newLine =
Row.fromRow(record)
.withFieldValue(
RecordWithMetadata.RECORD_NUM,
record.getInt64(RecordWithMetadata.RECORD_NUM_IN_OFFSET)
- + numRecordsLessThanThisRange)
+ + numRecordsLessThanThisOffset)
.build();
p.output(newLine);
}
+
+ private Long getNumRecordsBeforeOffset(
+ Long offset, Iterator<KV<Long, Long>>
numRecordsBeforeEachOffsetInFile) {
+ while (numRecordsBeforeEachOffsetInFile.hasNext()) {
+ KV<Long, Long> entry = numRecordsBeforeEachOffsetInFile.next();
+ if (entry.getKey().equals(offset)) {
+ return entry.getValue();
+ }
+ }
+ LOG.error("Unable to compute contextual metadata. Please report a bug
in ContextualTextIO");
+ return null;
Review comment:
Should we throw an exception here to fail the job ? Might be difficult
for the user to debug the null in their metadata.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]