jduo commented on code in PR #38423:
URL: https://github.com/apache/arrow/pull/38423#discussion_r1385786095


##########
java/vector/src/main/java/org/apache/arrow/vector/dictionary/BatchedDictionary.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.dictionary;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.hash.MurmurHasher;
+import org.apache.arrow.util.VisibleForTesting;
+import org.apache.arrow.vector.BaseIntVector;
+import org.apache.arrow.vector.BaseVariableWidthVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ipc.ArrowWriter;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.FieldType;
+
+/**
+ * A dictionary implementation that can be used when writing batches of data to
+ * a stream or file. Supports delta or replacement encoding.
+ */
+public class BatchedDictionary implements Closeable, BaseDictionary {
+
+  private final DictionaryEncoding encoding;
+
+  private final BaseVariableWidthVector dictionary;
+
+  private final BaseIntVector indexVector;
+
+  private final DictionaryHashTable hashTable;
+
+  private final boolean forFileIPC;
+
+  private int deltaIndex;
+
+  private int dictionaryIndex;
+
+  private boolean wasReset;
+
+  /**
+   * Creates a dictionary with two vectors of the given types. The dictionary 
vector
+   * will be named "{name}-dictionary".
+   * <p>
+   * To use this dictionary, provide the dictionary vector to a {@link 
DictionaryProvider},
+   * add the {@link #getIndexVector()} to the {@link 
org.apache.arrow.vector.VectorSchemaRoot}
+   * and call the {@link #setSafe(int, byte[], int, int)} or other set methods.
+   *
+   * @param name A name for the vector and dictionary.
+   * @param encoding The dictionary encoding to use.
+   * @param dictionaryType The type of the dictionary data.
+   * @param indexType The type of the encoded dictionary index.
+   * @param forFileIPC Whether the data will be written to a file or stream 
IPC. Throws an
+   *                   exception if a replacement dictionary is provided to a 
file IPC.
+   * @param allocator The allocator to use.
+   */
+  public BatchedDictionary(
+      String name,
+      DictionaryEncoding encoding,
+      ArrowType dictionaryType,
+      ArrowType indexType,
+      boolean forFileIPC,
+      BufferAllocator allocator
+  ) {
+    this(name, encoding, dictionaryType, indexType, forFileIPC, allocator, 
"-dictionary");
+  }
+
+  /**
+   * Creates a dictionary with two vectors of the given types.
+   *
+   * @param name A name for the vector and dictionary.
+   * @param encoding The dictionary encoding to use.
+   * @param dictionaryType The type of the dictionary data.
+   * @param indexType The type of the encoded dictionary index.
+   * @param forFileIPC Whether the data will be written to a file or stream 
IPC. Throws an
+   *                   exception if a replacement dictionary is provided to a 
file IPC.
+   * @param allocator The allocator to use.
+   * @param suffix A non-null suffix to append to the name of the dictionary.
+   */
+  public BatchedDictionary(
+      String name,
+      DictionaryEncoding encoding,
+      ArrowType dictionaryType,
+      ArrowType indexType,
+      boolean forFileIPC,
+      BufferAllocator allocator,
+      String suffix
+  ) {
+    this.encoding = encoding;
+    this.forFileIPC = forFileIPC;
+    FieldVector vector = new FieldType(false, dictionaryType, null)
+        .createNewSingleVector(name + suffix, allocator, null);
+    if (!(BaseVariableWidthVector.class.isAssignableFrom(vector.getClass()))) {

Review Comment:
   It seems like we should be able to evaluate dictionaryType before creating 
the vector rather than validating the type constructed afterwards.



##########
java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFile.java:
##########
@@ -131,4 +150,67 @@ public void testFileStreamHasEos() throws IOException {
       }
     }
   }
+
+  @ParameterizedTest
+  @MethodSource("dictionaryParams")
+  public void testMultiBatchDictionaries(int state) throws Exception {
+    File file = new File("target/mytest_multi_batch_dictionaries_" + state + 
".arrow");

Review Comment:
   This wouldn't work right if the test got built as a JAR. These inputs should 
probably be moved to test resoucres and opened using getResource(). This could 
be a separate issue.



##########
java/vector/src/main/java/org/apache/arrow/vector/dictionary/BatchedDictionary.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.dictionary;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.hash.MurmurHasher;
+import org.apache.arrow.util.VisibleForTesting;
+import org.apache.arrow.vector.BaseIntVector;
+import org.apache.arrow.vector.BaseVariableWidthVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ipc.ArrowWriter;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.FieldType;
+
+/**
+ * A dictionary implementation that can be used when writing batches of data to
+ * a stream or file. Supports delta or replacement encoding.
+ */
+public class BatchedDictionary implements Closeable, BaseDictionary {
+
+  private final DictionaryEncoding encoding;
+
+  private final BaseVariableWidthVector dictionary;
+
+  private final BaseIntVector indexVector;
+
+  private final DictionaryHashTable hashTable;
+
+  private final boolean forFileIPC;
+
+  private int deltaIndex;
+
+  private int dictionaryIndex;
+
+  private boolean wasReset;
+
+  /**
+   * Creates a dictionary with two vectors of the given types. The dictionary 
vector
+   * will be named "{name}-dictionary".
+   * <p>
+   * To use this dictionary, provide the dictionary vector to a {@link 
DictionaryProvider},
+   * add the {@link #getIndexVector()} to the {@link 
org.apache.arrow.vector.VectorSchemaRoot}
+   * and call the {@link #setSafe(int, byte[], int, int)} or other set methods.
+   *
+   * @param name A name for the vector and dictionary.
+   * @param encoding The dictionary encoding to use.
+   * @param dictionaryType The type of the dictionary data.
+   * @param indexType The type of the encoded dictionary index.
+   * @param forFileIPC Whether the data will be written to a file or stream 
IPC. Throws an
+   *                   exception if a replacement dictionary is provided to a 
file IPC.
+   * @param allocator The allocator to use.
+   */
+  public BatchedDictionary(
+      String name,
+      DictionaryEncoding encoding,
+      ArrowType dictionaryType,
+      ArrowType indexType,
+      boolean forFileIPC,
+      BufferAllocator allocator
+  ) {
+    this(name, encoding, dictionaryType, indexType, forFileIPC, allocator, 
"-dictionary");
+  }
+
+  /**
+   * Creates a dictionary with two vectors of the given types.
+   *
+   * @param name A name for the vector and dictionary.
+   * @param encoding The dictionary encoding to use.
+   * @param dictionaryType The type of the dictionary data.
+   * @param indexType The type of the encoded dictionary index.
+   * @param forFileIPC Whether the data will be written to a file or stream 
IPC. Throws an
+   *                   exception if a replacement dictionary is provided to a 
file IPC.
+   * @param allocator The allocator to use.
+   * @param suffix A non-null suffix to append to the name of the dictionary.
+   */
+  public BatchedDictionary(
+      String name,
+      DictionaryEncoding encoding,
+      ArrowType dictionaryType,
+      ArrowType indexType,
+      boolean forFileIPC,
+      BufferAllocator allocator,
+      String suffix
+  ) {
+    this.encoding = encoding;
+    this.forFileIPC = forFileIPC;
+    FieldVector vector = new FieldType(false, dictionaryType, null)
+        .createNewSingleVector(name + suffix, allocator, null);
+    if (!(BaseVariableWidthVector.class.isAssignableFrom(vector.getClass()))) {
+      throw new IllegalArgumentException("Dictionary must be a superclass of 
'BaseVariableWidthVector' " +
+          "such as 'VarCharVector'.");
+    }
+    dictionary = (BaseVariableWidthVector) vector;
+    vector = new FieldType(true, indexType, encoding)
+        .createNewSingleVector(name, allocator, null);
+    if (!(BaseIntVector.class.isAssignableFrom(vector.getClass()))) {
+      throw new IllegalArgumentException("Index vector must be a superclass 
type of 'BaseIntVector' " +

Review Comment:
   It would be good to validate both the dictionaryType and indexType before 
creating either vectors so that the constructor never gets into a state where 
it throws with vectors already constructed (they should actually be released 
with close()).



##########
java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java:
##########
@@ -846,4 +862,293 @@ protected void validateListAsMapData(VectorSchemaRoot 
root) {
       }
     }
   }
+
+  /**
+   * Utility to write permutations of dictionary encoding.
+   *
+   * state == 1, one delta dictionary.

Review Comment:
   Could this be an enum?



##########
java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java:
##########
@@ -164,12 +146,54 @@ public boolean loadNextBatch() throws IOException {
       ArrowBlock block = footer.getRecordBatches().get(currentRecordBatch++);
       ArrowRecordBatch batch = readRecordBatch(in, block, allocator);
       loadRecordBatch(batch);
+      loadDictionaries();

Review Comment:
   If loadDictionaries() throws an exception, does the batch leak?



##########
java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java:
##########
@@ -79,15 +84,24 @@ public final Set<Long> getDictionaryIds() {
     }
 
     @Override
-    public Dictionary lookup(long id) {
+    public BaseDictionary lookup(long id) {
       return map.get(id);
     }
 
     @Override
     public void close() {
-      for (Dictionary dictionary : map.values()) {
+      for (BaseDictionary dictionary : map.values()) {
         dictionary.getVector().close();
       }
     }
+
+    @Override
+    public void resetBatchedDictionaries() {

Review Comment:
   It is strange that only batched dictionaries need to be reset.



##########
java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java:
##########
@@ -846,4 +862,293 @@ protected void validateListAsMapData(VectorSchemaRoot 
root) {
       }
     }
   }
+
+  /**
+   * Utility to write permutations of dictionary encoding.
+   *
+   * state == 1, one delta dictionary.
+   * state == 2, one standalone dictionary.
+   * state == 3, one of each
+   * state == 4, delta with nothing at start and end
+   * state == 5, both deltas
+   * state == 6, both deltas and standalone
+   * state == 7, replacement dictionary
+   */
+  protected void writeDataMultiBatchWithDictionaries(OutputStream stream, int 
state) throws IOException {
+    DictionaryProvider.MapDictionaryProvider provider = new 
DictionaryProvider.MapDictionaryProvider();
+    DictionaryEncoding deltaEncoding =
+        new DictionaryEncoding(42, false, new ArrowType.Int(16, false), true);
+    DictionaryEncoding replacementEncoding =
+        new DictionaryEncoding(24, false, new ArrowType.Int(16, false), false);
+    DictionaryEncoding deltaCEncoding =
+        new DictionaryEncoding(1, false, new ArrowType.Int(16, false), true);
+    DictionaryEncoding replacementEncodingUpdated =
+        new DictionaryEncoding(2, false, new ArrowType.Int(16, false), false);
+
+    boolean isFile = stream instanceof FileOutputStream;
+    try (BatchedDictionary vectorA = newDictionary("vectorA", deltaEncoding, 
isFile);
+         BatchedDictionary vectorB = newDictionary("vectorB", 
replacementEncoding, isFile);
+         BatchedDictionary vectorC = newDictionary("vectorC", deltaCEncoding, 
isFile);
+         BatchedDictionary vectorD = newDictionary("vectorD", 
replacementEncodingUpdated, isFile);) {
+      switch (state) {
+        case 1:
+          provider.put(vectorA);
+          break;
+        case 2:
+          provider.put(vectorB);
+          break;
+        case 3:
+          provider.put(vectorA);
+          provider.put(vectorB);
+          break;
+        case 4:
+          provider.put(vectorC);
+          break;
+        case 5:
+          provider.put(vectorA);
+          provider.put(vectorC);
+          break;
+        case 6:
+          provider.put(vectorA);
+          provider.put(vectorB);
+          provider.put(vectorC);
+          break;
+        case 7:
+          provider.put(vectorD);
+          break;
+        default:
+          throw new IllegalStateException("Unsupported state: " + state);
+      }
+
+      VectorSchemaRoot root = null;
+      switch (state) {
+        case 1:
+          root = VectorSchemaRoot.of(vectorA.getIndexVector());
+          break;
+        case 2:
+          root = VectorSchemaRoot.of(vectorB.getIndexVector());
+          break;
+        case 3:
+          root = VectorSchemaRoot.of(vectorA.getIndexVector(), 
vectorB.getIndexVector());
+          break;
+        case 4:
+          root = VectorSchemaRoot.of(vectorC.getIndexVector());
+          break;
+        case 5:
+          root = VectorSchemaRoot.of(vectorA.getIndexVector(), 
vectorC.getIndexVector());
+          break;
+        case 6:
+          root = VectorSchemaRoot.of(vectorA.getIndexVector(), 
vectorB.getIndexVector(), vectorC.getIndexVector());
+          break;
+        case 7:
+          root = VectorSchemaRoot.of(vectorD.getIndexVector());
+          break;
+        default:
+          throw new IllegalStateException("Unsupported state: " + state);
+      }
+
+      ArrowWriter arrowWriter = null;
+      try {
+        if (stream instanceof FileOutputStream) {
+          FileChannel channel = ((FileOutputStream) stream).getChannel();
+          arrowWriter = new ArrowFileWriter(root, provider, channel);
+        } else {
+          arrowWriter = new ArrowStreamWriter(root, provider, stream);
+        }
+
+        vectorA.setSafe(0, "foo".getBytes(StandardCharsets.UTF_8));
+        vectorA.setSafe(1, "bar".getBytes(StandardCharsets.UTF_8));
+        vectorB.setSafe(0, "lorem".getBytes(StandardCharsets.UTF_8));
+        vectorB.setSafe(1, "ipsum".getBytes(StandardCharsets.UTF_8));
+        vectorC.setNull(0);
+        vectorC.setNull(1);
+        vectorD.setSafe(0, "porro".getBytes(StandardCharsets.UTF_8));
+        vectorD.setSafe(1, "amet".getBytes(StandardCharsets.UTF_8));
+
+        // batch 1
+        arrowWriter.start();
+        root.setRowCount(2);
+        arrowWriter.writeBatch();
+
+        // batch 2
+        vectorA.setSafe(0, "meep".getBytes(StandardCharsets.UTF_8));
+        vectorA.setSafe(1, "bar".getBytes(StandardCharsets.UTF_8));
+        vectorB.setSafe(0, "ipsum".getBytes(StandardCharsets.UTF_8));
+        vectorB.setSafe(1, "lorem".getBytes(StandardCharsets.UTF_8));
+        vectorC.setSafe(0, "qui".getBytes(StandardCharsets.UTF_8));
+        vectorC.setSafe(1, "dolor".getBytes(StandardCharsets.UTF_8));
+        vectorD.setSafe(0, "amet".getBytes(StandardCharsets.UTF_8));
+        if (state == 7) {
+          vectorD.setSafe(1, "quia".getBytes(StandardCharsets.UTF_8));
+        }
+
+        root.setRowCount(2);
+        arrowWriter.writeBatch();
+
+        // batch 3
+        vectorA.setNull(0);
+        vectorA.setNull(1);
+        vectorB.setSafe(0, "ipsum".getBytes(StandardCharsets.UTF_8));
+        vectorB.setNull(1);
+        vectorC.setNull(0);
+        vectorC.setSafe(1, "qui".getBytes(StandardCharsets.UTF_8));
+        vectorD.setNull(0);
+        if (state == 7) {
+          vectorD.setSafe(1, "quia".getBytes(StandardCharsets.UTF_8));
+        }
+
+        root.setRowCount(2);
+        arrowWriter.writeBatch();
+
+        // batch 4
+        vectorA.setSafe(0, "bar".getBytes(StandardCharsets.UTF_8));
+        vectorA.setSafe(1, "zap".getBytes(StandardCharsets.UTF_8));
+        vectorB.setNull(0);
+        vectorB.setSafe(1, "lorem".getBytes(StandardCharsets.UTF_8));
+        vectorC.setNull(0);
+        vectorC.setNull(1);
+        if (state == 7) {
+          vectorD.setSafe(0, "quia".getBytes(StandardCharsets.UTF_8));
+        }
+        vectorD.setNull(1);
+
+        root.setRowCount(2);
+        arrowWriter.writeBatch();
+
+        arrowWriter.end();
+      } catch (Exception e) {
+        if (arrowWriter != null) {
+          arrowWriter.close();
+        }
+        throw e;
+      }
+    }
+  }
+
+  Map<Integer, String[][]> valuesPerBlock = new HashMap<Integer, String[][]>();
+
+  {
+    valuesPerBlock.put(0, new String[][]{
+        {"foo", "bar"},
+        {"lorem", "ipsum"},
+        {null, null},
+        {"porro", "amet"}
+    });
+    valuesPerBlock.put(1, new String[][]{
+        {"meep", "bar"},
+        {"ipsum", "lorem"},
+        {"qui", "dolor"},
+        {"amet", "quia"}
+    });
+    valuesPerBlock.put(2, new String[][]{
+        {null, null},
+        {"ipsum", null},
+        {null, "qui"},
+        {null, "quia"}
+    });
+    valuesPerBlock.put(3, new String[][]{
+        {"bar", "zap"},
+        {null, "lorem"},
+        {null, null},
+        {"quia", null}
+    });
+  }
+
+  protected void assertDictionary(FieldVector encoded, ArrowReader reader, 
String... expected) throws Exception {
+    DictionaryEncoding dictionaryEncoding = encoded.getField().getDictionary();
+    BaseDictionary dictionary = 
reader.getDictionaryVectors().get(dictionaryEncoding.getId());
+    try (ValueVector decoded = DictionaryEncoder.decode(encoded, dictionary)) {
+      Assertions.assertEquals(expected.length, encoded.getValueCount());
+      for (int i = 0; i < expected.length; i++) {
+        if (expected[i] == null) {
+          Assertions.assertNull(decoded.getObject(i));

Review Comment:
   It would be good to be consistent about static imports of assertion 
functions. assertNotNull() is statically imported but assertNull() isn't.



##########
java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java:
##########
@@ -79,15 +84,24 @@ public final Set<Long> getDictionaryIds() {
     }
 
     @Override
-    public Dictionary lookup(long id) {
+    public BaseDictionary lookup(long id) {
       return map.get(id);
     }
 
     @Override
     public void close() {
-      for (Dictionary dictionary : map.values()) {
+      for (BaseDictionary dictionary : map.values()) {

Review Comment:
   Does the dictionary itself need to close()?



##########
java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java:
##########
@@ -846,4 +862,293 @@ protected void validateListAsMapData(VectorSchemaRoot 
root) {
       }
     }
   }
+
+  /**
+   * Utility to write permutations of dictionary encoding.
+   *
+   * state == 1, one delta dictionary.
+   * state == 2, one standalone dictionary.
+   * state == 3, one of each
+   * state == 4, delta with nothing at start and end
+   * state == 5, both deltas
+   * state == 6, both deltas and standalone
+   * state == 7, replacement dictionary
+   */
+  protected void writeDataMultiBatchWithDictionaries(OutputStream stream, int 
state) throws IOException {
+    DictionaryProvider.MapDictionaryProvider provider = new 
DictionaryProvider.MapDictionaryProvider();
+    DictionaryEncoding deltaEncoding =
+        new DictionaryEncoding(42, false, new ArrowType.Int(16, false), true);
+    DictionaryEncoding replacementEncoding =
+        new DictionaryEncoding(24, false, new ArrowType.Int(16, false), false);
+    DictionaryEncoding deltaCEncoding =
+        new DictionaryEncoding(1, false, new ArrowType.Int(16, false), true);
+    DictionaryEncoding replacementEncodingUpdated =
+        new DictionaryEncoding(2, false, new ArrowType.Int(16, false), false);
+
+    boolean isFile = stream instanceof FileOutputStream;
+    try (BatchedDictionary vectorA = newDictionary("vectorA", deltaEncoding, 
isFile);
+         BatchedDictionary vectorB = newDictionary("vectorB", 
replacementEncoding, isFile);
+         BatchedDictionary vectorC = newDictionary("vectorC", deltaCEncoding, 
isFile);
+         BatchedDictionary vectorD = newDictionary("vectorD", 
replacementEncodingUpdated, isFile);) {
+      switch (state) {
+        case 1:
+          provider.put(vectorA);
+          break;
+        case 2:
+          provider.put(vectorB);
+          break;
+        case 3:
+          provider.put(vectorA);
+          provider.put(vectorB);
+          break;
+        case 4:
+          provider.put(vectorC);
+          break;
+        case 5:
+          provider.put(vectorA);
+          provider.put(vectorC);
+          break;
+        case 6:
+          provider.put(vectorA);
+          provider.put(vectorB);
+          provider.put(vectorC);
+          break;
+        case 7:
+          provider.put(vectorD);
+          break;
+        default:
+          throw new IllegalStateException("Unsupported state: " + state);
+      }
+
+      VectorSchemaRoot root = null;
+      switch (state) {
+        case 1:
+          root = VectorSchemaRoot.of(vectorA.getIndexVector());
+          break;
+        case 2:
+          root = VectorSchemaRoot.of(vectorB.getIndexVector());
+          break;
+        case 3:
+          root = VectorSchemaRoot.of(vectorA.getIndexVector(), 
vectorB.getIndexVector());
+          break;
+        case 4:
+          root = VectorSchemaRoot.of(vectorC.getIndexVector());
+          break;
+        case 5:
+          root = VectorSchemaRoot.of(vectorA.getIndexVector(), 
vectorC.getIndexVector());
+          break;
+        case 6:
+          root = VectorSchemaRoot.of(vectorA.getIndexVector(), 
vectorB.getIndexVector(), vectorC.getIndexVector());
+          break;
+        case 7:
+          root = VectorSchemaRoot.of(vectorD.getIndexVector());
+          break;
+        default:
+          throw new IllegalStateException("Unsupported state: " + state);
+      }
+
+      ArrowWriter arrowWriter = null;
+      try {
+        if (stream instanceof FileOutputStream) {
+          FileChannel channel = ((FileOutputStream) stream).getChannel();
+          arrowWriter = new ArrowFileWriter(root, provider, channel);
+        } else {
+          arrowWriter = new ArrowStreamWriter(root, provider, stream);
+        }
+
+        vectorA.setSafe(0, "foo".getBytes(StandardCharsets.UTF_8));
+        vectorA.setSafe(1, "bar".getBytes(StandardCharsets.UTF_8));
+        vectorB.setSafe(0, "lorem".getBytes(StandardCharsets.UTF_8));
+        vectorB.setSafe(1, "ipsum".getBytes(StandardCharsets.UTF_8));
+        vectorC.setNull(0);
+        vectorC.setNull(1);
+        vectorD.setSafe(0, "porro".getBytes(StandardCharsets.UTF_8));
+        vectorD.setSafe(1, "amet".getBytes(StandardCharsets.UTF_8));
+
+        // batch 1
+        arrowWriter.start();
+        root.setRowCount(2);
+        arrowWriter.writeBatch();
+
+        // batch 2
+        vectorA.setSafe(0, "meep".getBytes(StandardCharsets.UTF_8));
+        vectorA.setSafe(1, "bar".getBytes(StandardCharsets.UTF_8));
+        vectorB.setSafe(0, "ipsum".getBytes(StandardCharsets.UTF_8));
+        vectorB.setSafe(1, "lorem".getBytes(StandardCharsets.UTF_8));
+        vectorC.setSafe(0, "qui".getBytes(StandardCharsets.UTF_8));
+        vectorC.setSafe(1, "dolor".getBytes(StandardCharsets.UTF_8));
+        vectorD.setSafe(0, "amet".getBytes(StandardCharsets.UTF_8));
+        if (state == 7) {
+          vectorD.setSafe(1, "quia".getBytes(StandardCharsets.UTF_8));
+        }
+
+        root.setRowCount(2);
+        arrowWriter.writeBatch();
+
+        // batch 3
+        vectorA.setNull(0);
+        vectorA.setNull(1);
+        vectorB.setSafe(0, "ipsum".getBytes(StandardCharsets.UTF_8));
+        vectorB.setNull(1);
+        vectorC.setNull(0);
+        vectorC.setSafe(1, "qui".getBytes(StandardCharsets.UTF_8));
+        vectorD.setNull(0);
+        if (state == 7) {
+          vectorD.setSafe(1, "quia".getBytes(StandardCharsets.UTF_8));
+        }
+
+        root.setRowCount(2);
+        arrowWriter.writeBatch();
+
+        // batch 4
+        vectorA.setSafe(0, "bar".getBytes(StandardCharsets.UTF_8));
+        vectorA.setSafe(1, "zap".getBytes(StandardCharsets.UTF_8));
+        vectorB.setNull(0);
+        vectorB.setSafe(1, "lorem".getBytes(StandardCharsets.UTF_8));
+        vectorC.setNull(0);
+        vectorC.setNull(1);
+        if (state == 7) {
+          vectorD.setSafe(0, "quia".getBytes(StandardCharsets.UTF_8));
+        }
+        vectorD.setNull(1);
+
+        root.setRowCount(2);
+        arrowWriter.writeBatch();
+
+        arrowWriter.end();
+      } catch (Exception e) {
+        if (arrowWriter != null) {
+          arrowWriter.close();
+        }
+        throw e;
+      }
+    }
+  }
+
+  Map<Integer, String[][]> valuesPerBlock = new HashMap<Integer, String[][]>();
+
+  {
+    valuesPerBlock.put(0, new String[][]{
+        {"foo", "bar"},
+        {"lorem", "ipsum"},
+        {null, null},
+        {"porro", "amet"}
+    });
+    valuesPerBlock.put(1, new String[][]{
+        {"meep", "bar"},
+        {"ipsum", "lorem"},
+        {"qui", "dolor"},
+        {"amet", "quia"}
+    });
+    valuesPerBlock.put(2, new String[][]{
+        {null, null},
+        {"ipsum", null},
+        {null, "qui"},
+        {null, "quia"}
+    });
+    valuesPerBlock.put(3, new String[][]{
+        {"bar", "zap"},
+        {null, "lorem"},
+        {null, null},
+        {"quia", null}
+    });
+  }
+
+  protected void assertDictionary(FieldVector encoded, ArrowReader reader, 
String... expected) throws Exception {
+    DictionaryEncoding dictionaryEncoding = encoded.getField().getDictionary();
+    BaseDictionary dictionary = 
reader.getDictionaryVectors().get(dictionaryEncoding.getId());
+    try (ValueVector decoded = DictionaryEncoder.decode(encoded, dictionary)) {
+      Assertions.assertEquals(expected.length, encoded.getValueCount());
+      for (int i = 0; i < expected.length; i++) {
+        if (expected[i] == null) {
+          Assertions.assertNull(decoded.getObject(i));
+        } else {
+          assertNotNull(decoded.getObject(i));
+          Assertions.assertEquals(expected[i], 
decoded.getObject(i).toString());
+        }
+      }
+    }
+  }
+
+  protected void assertBlock(File file, int block, int state) throws Exception 
{
+    try (FileInputStream fileInputStream = new FileInputStream(file);
+         ArrowFileReader reader = new 
ArrowFileReader(fileInputStream.getChannel(), allocator);) {
+      reader.loadRecordBatch(reader.getRecordBlocks().get(block));
+      assertBlock(reader, block, state);
+    }
+  }
+
+  protected void assertBlock(ArrowReader reader, int block, int state) throws 
Exception {
+    VectorSchemaRoot r = reader.getVectorSchemaRoot();
+    FieldVector dictA = r.getVector("vectorA");
+    FieldVector dictB = r.getVector("vectorB");
+    FieldVector dictC = r.getVector("vectorC");
+    FieldVector dictD = r.getVector("vectorD");
+
+    switch (state) {
+      case 1:
+        assertDictionary(dictA, reader, valuesPerBlock.get(block)[0]);
+        assertNull(dictB);
+        assertNull(dictC);
+        assertNull(dictD);
+        break;
+      case 2:
+        assertNull(dictA);
+        assertDictionary(dictB, reader, valuesPerBlock.get(block)[1]);
+        assertNull(dictC);
+        assertNull(dictD);
+        break;
+      case 3:
+        assertDictionary(dictA, reader, valuesPerBlock.get(block)[0]);
+        assertDictionary(dictB, reader, valuesPerBlock.get(block)[1]);
+        assertNull(dictC);
+        assertNull(dictD);
+        break;
+      case 4:
+        assertNull(dictA);
+        assertNull(dictB);
+        assertDictionary(dictC, reader, valuesPerBlock.get(block)[2]);
+        assertNull(dictD);
+        break;
+      case 5:
+        assertDictionary(dictA, reader, valuesPerBlock.get(block)[0]);
+        assertNull(dictB);
+        assertDictionary(dictC, reader, valuesPerBlock.get(block)[2]);
+        assertNull(dictD);
+        break;
+      case 6:
+        assertDictionary(dictA, reader, valuesPerBlock.get(block)[0]);
+        assertDictionary(dictB, reader, valuesPerBlock.get(block)[1]);
+        assertDictionary(dictC, reader, valuesPerBlock.get(block)[2]);
+        assertNull(dictD);
+        break;
+      case 7:
+        assertNull(dictA);
+        assertNull(dictB);
+        assertNull(dictC);
+        assertDictionary(dictD, reader, valuesPerBlock.get(block)[3]);
+        break;
+      default:
+        throw new IllegalStateException("Unsupported state: " + state);
+    }
+  }
+
+  protected static Collection<Arguments> dictionaryParams() {
+    List<Arguments> params = new ArrayList<>();
+    for (int i = 1; i < 8; i++) {

Review Comment:
   Not clear where these numbers come from.



##########
java/vector/src/main/java/org/apache/arrow/vector/dictionary/BatchedDictionary.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.dictionary;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.hash.MurmurHasher;
+import org.apache.arrow.util.VisibleForTesting;
+import org.apache.arrow.vector.BaseIntVector;
+import org.apache.arrow.vector.BaseVariableWidthVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ipc.ArrowWriter;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.FieldType;
+
+/**
+ * A dictionary implementation that can be used when writing batches of data to
+ * a stream or file. Supports delta or replacement encoding.
+ */
+public class BatchedDictionary implements Closeable, BaseDictionary {
+
+  private final DictionaryEncoding encoding;
+
+  private final BaseVariableWidthVector dictionary;
+
+  private final BaseIntVector indexVector;
+
+  private final DictionaryHashTable hashTable;
+
+  private final boolean forFileIPC;
+
+  private int deltaIndex;
+
+  private int dictionaryIndex;
+
+  private boolean wasReset;
+
+  /**
+   * Creates a dictionary with two vectors of the given types. The dictionary 
vector
+   * will be named "{name}-dictionary".
+   * <p>
+   * To use this dictionary, provide the dictionary vector to a {@link 
DictionaryProvider},
+   * add the {@link #getIndexVector()} to the {@link 
org.apache.arrow.vector.VectorSchemaRoot}
+   * and call the {@link #setSafe(int, byte[], int, int)} or other set methods.
+   *
+   * @param name A name for the vector and dictionary.
+   * @param encoding The dictionary encoding to use.
+   * @param dictionaryType The type of the dictionary data.
+   * @param indexType The type of the encoded dictionary index.
+   * @param forFileIPC Whether the data will be written to a file or stream 
IPC. Throws an
+   *                   exception if a replacement dictionary is provided to a 
file IPC.
+   * @param allocator The allocator to use.
+   */
+  public BatchedDictionary(
+      String name,
+      DictionaryEncoding encoding,
+      ArrowType dictionaryType,
+      ArrowType indexType,
+      boolean forFileIPC,
+      BufferAllocator allocator
+  ) {
+    this(name, encoding, dictionaryType, indexType, forFileIPC, allocator, 
"-dictionary");
+  }
+
+  /**
+   * Creates a dictionary with two vectors of the given types.
+   *
+   * @param name A name for the vector and dictionary.
+   * @param encoding The dictionary encoding to use.
+   * @param dictionaryType The type of the dictionary data.
+   * @param indexType The type of the encoded dictionary index.
+   * @param forFileIPC Whether the data will be written to a file or stream 
IPC. Throws an
+   *                   exception if a replacement dictionary is provided to a 
file IPC.
+   * @param allocator The allocator to use.
+   * @param suffix A non-null suffix to append to the name of the dictionary.
+   */
+  public BatchedDictionary(
+      String name,
+      DictionaryEncoding encoding,
+      ArrowType dictionaryType,
+      ArrowType indexType,
+      boolean forFileIPC,
+      BufferAllocator allocator,
+      String suffix
+  ) {
+    this.encoding = encoding;
+    this.forFileIPC = forFileIPC;
+    FieldVector vector = new FieldType(false, dictionaryType, null)
+        .createNewSingleVector(name + suffix, allocator, null);
+    if (!(BaseVariableWidthVector.class.isAssignableFrom(vector.getClass()))) {
+      throw new IllegalArgumentException("Dictionary must be a superclass of 
'BaseVariableWidthVector' " +
+          "such as 'VarCharVector'.");
+    }
+    dictionary = (BaseVariableWidthVector) vector;
+    vector = new FieldType(true, indexType, encoding)
+        .createNewSingleVector(name, allocator, null);
+    if (!(BaseIntVector.class.isAssignableFrom(vector.getClass()))) {
+      throw new IllegalArgumentException("Index vector must be a superclass 
type of 'BaseIntVector' " +
+          "such as 'IntVector' or 'Uint4Vector'.");
+    }
+    indexVector = (BaseIntVector) vector;
+    hashTable = new DictionaryHashTable();
+  }
+
+  /**
+   * Creates a dictionary that will populate the provided vectors with data. 
Useful if
+   * dictionaries need to be children of a parent vector.
+   * @param dictionary The dictionary to hold the original data.
+   * @param indexVector The index to store the encoded offsets.
+   * @param forFileIPC Whether the data will be written to a file or stream 
IPC. Throws an
+   *                   exception if a replacement dictionary is provided to a 
file IPC.
+   */
+  public BatchedDictionary(
+      FieldVector dictionary,
+      FieldVector indexVector,
+      boolean forFileIPC
+  ) {
+    this.encoding = dictionary.getField().getDictionary();
+    this.forFileIPC = forFileIPC;
+    if 
(!(BaseVariableWidthVector.class.isAssignableFrom(dictionary.getClass()))) {
+      throw new IllegalArgumentException("Dictionary must be a superclass of 
'BaseVariableWidthVector' " +
+          "such as 'VarCharVector'.");
+    }
+    if (dictionary.getField().isNullable()) {
+      throw new IllegalArgumentException("Dictionary must be non-nullable.");
+    }
+    this.dictionary = (BaseVariableWidthVector) dictionary;
+    if (!(BaseIntVector.class.isAssignableFrom(indexVector.getClass()))) {
+      throw new IllegalArgumentException("Index vector must be a superclass 
type of 'BaseIntVector' " +
+          "such as 'IntVector' or 'Uint4Vector'.");
+    }
+    this.indexVector = (BaseIntVector) indexVector;
+    hashTable = new DictionaryHashTable();
+  }
+
+  /**
+   * The index vector.
+   */
+  public FieldVector getIndexVector() {
+    return indexVector;
+  }
+
+  @Override
+  public FieldVector getVector() {
+    return dictionary;
+  }
+
+  @Override
+  public ArrowType getVectorType() {
+    return dictionary.getField().getType();
+  }
+
+  @Override
+  public DictionaryEncoding getEncoding() {
+    return encoding;
+  }
+
+  /**
+   * Considers the entire byte array as the dictionary value. If the value is 
null,
+   * a null will be written to the index.
+   *
+   * @param index the value to change
+   * @param value the value to write.
+   */
+  public void setSafe(int index, byte[] value) {
+    if (value == null) {
+      setNull(index);
+      return;
+    }
+    setSafe(index, value, 0, value.length);
+  }
+
+  /**
+   * Encodes the given range in the dictionary. If the value is null, a null 
will be
+   * written to the index.
+   *
+   * @param index the value to change
+   * @param value the value to write.
+   * @param offset An offset into the value array.
+   * @param len The length of the value to write.
+   */
+  public void setSafe(int index, byte[] value, int offset, int len) {
+    if (value == null || len == 0) {
+      setNull(index);
+      return;
+    }
+    int di = getIndex(value, offset, len);
+    indexVector.setWithPossibleTruncate(index, di);
+  }
+
+  /**
+   * Set the element at the given index to null.
+   *
+   * @param index the value to change
+   */
+  public void setNull(int index) {
+    indexVector.setNull(index);
+  }
+
+  @Override
+  public void close() throws IOException {
+    dictionary.close();
+    indexVector.close();
+  }
+
+  /**
+   * Mark the dictionary as complete for the batch. Called by the {@link 
ArrowWriter}
+   * on {@link ArrowWriter#writeBatch()}.
+   */
+  public void mark() {
+    dictionary.setValueCount(dictionaryIndex);
+    // not setting the index vector value count. That will happen when the 
user calls
+    // VectorSchemaRoot#setRowCount().
+  }
+
+  /**
+   * Resets the dictionary to be used for a new batch. Called by the {@link 
ArrowWriter} on
+   * {@link ArrowWriter#writeBatch()}.
+   */
+  public void reset() {
+    wasReset = true;

Review Comment:
   wasReset only ever gets set to true. So if you reset() the dictionary, then 
populate it again, it is still marked as "wasReset". Is this intentional? It 
seems like the object basically becomes unusuable if it is reset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to