davisusanibar commented on code in PR #38423:
URL: https://github.com/apache/arrow/pull/38423#discussion_r1419830287
##########
java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java:
##########
@@ -164,12 +146,59 @@ public boolean loadNextBatch() throws IOException {
ArrowBlock block = footer.getRecordBatches().get(currentRecordBatch++);
ArrowRecordBatch batch = readRecordBatch(in, block, allocator);
loadRecordBatch(batch);
+ try {
+ loadDictionaries();
+ } catch (IOException e) {
+ batch.close();
+ throw e;
+ }
return true;
} else {
return false;
}
}
+ /**
+ * Loads any dictionaries that may be needed by the given record batch. It
attempts
+ * to read as little as possible but may read in more deltas than are
necessary for blocks
+ * toward the end of the file.
+ */
+ private void loadDictionaries() throws IOException {
+ // initial load
+ if (currentDictionaryBatch == 0) {
+ for (int i = 0; i < dictionaries.size(); i++) {
+ ArrowBlock block =
footer.getDictionaries().get(currentDictionaryBatch++);
+ ArrowDictionaryBatch dictionaryBatch = readDictionaryBatch(in, block,
allocator);
+ loadDictionary(dictionaryBatch, false);
+ }
+ estimatedDictionaryRecordBatch++;
+ } else {
+ // we need to look for delta dictionaries. It involves a look-ahead,
unfortunately.
+ HashSet<Long> visited = new HashSet<Long>();
+ while (estimatedDictionaryRecordBatch < currentRecordBatch &&
+ currentDictionaryBatch < footer.getDictionaries().size()) {
+ ArrowBlock block =
footer.getDictionaries().get(currentDictionaryBatch++);
+ ArrowDictionaryBatch dictionaryBatch = readDictionaryBatch(in, block,
allocator);
+ long dictionaryId = dictionaryBatch.getDictionaryId();
+ if (visited.contains(dictionaryId)) {
+ // done
+ currentDictionaryBatch--;
+ estimatedDictionaryRecordBatch++;
+ } else if (!dictionaries.containsKey(dictionaryId)) {
+ throw new IOException("Dictionary ID " + dictionaryId + " was
written " +
+ "after the initial batch. The file does not follow the IPC file
protocol.");
+ } else if (!dictionaryBatch.isDelta()) {
+ throw new IOException("Dictionary ID " + dictionaryId + " was
written as a replacement " +
+ "after the initial batch. Replacement dictionaries are not
currently allowed in the IPC file protocol.");
+ } else {
+ loadDictionary(dictionaryBatch, true);
+ }
+ }
+ if (currentDictionaryBatch >= footer.getDictionaries().size()) {
Review Comment:
Is it necessary? Would it be possible for you to validate if the unit test
works with or without this validation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]