This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new ea936e3506 GH-37910: [Java][Integration] Implement C Data Interface
integration testing (#38248)
ea936e3506 is described below
commit ea936e3506e5b408ff39a2ef762ab5fa7aba72ae
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu Oct 19 14:00:28 2023 +0200
GH-37910: [Java][Integration] Implement C Data Interface integration
testing (#38248)
### Rationale for this change
### What changes are included in this PR?
### Are these changes tested?
### Are there any user-facing changes?
* Closes: #37910
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
ci/scripts/integration_arrow.sh | 4 +-
dev/archery/archery/integration/cdata.py | 18 ++-
dev/archery/archery/integration/datagen.py | 1 -
dev/archery/archery/integration/runner.py | 23 +--
dev/archery/archery/integration/tester.py | 104 ++++++------
dev/archery/archery/integration/tester_cpp.py | 11 --
dev/archery/archery/integration/tester_csharp.py | 19 ++-
dev/archery/archery/integration/tester_go.py | 18 +--
dev/archery/archery/integration/tester_java.py | 177 ++++++++++++++++++++-
docker-compose.yml | 11 +-
.../apache/arrow/c/BufferImportTypeVisitor.java | 4 +-
.../c/src/main/java/org/apache/arrow/c/Format.java | 4 +
.../java/org/apache/arrow/c/SchemaImporter.java | 2 +-
.../java/org/apache/arrow/c/DictionaryTest.java | 4 +-
.../test/java/org/apache/arrow/c/StreamTest.java | 2 +-
.../java/org/apache/arrow/vector/NullVector.java | 1 +
.../arrow/vector/compare/RangeEqualsVisitor.java | 6 +-
.../vector/dictionary/DictionaryProvider.java | 29 +++-
.../apache/arrow/vector/ipc/JsonFileReader.java | 38 +++--
.../arrow/vector/ipc/message/ArrowRecordBatch.java | 4 +-
.../org/apache/arrow/vector/util/Validator.java | 26 +++
21 files changed, 372 insertions(+), 134 deletions(-)
diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh
index 289d376a4d..2861b1c09d 100755
--- a/ci/scripts/integration_arrow.sh
+++ b/ci/scripts/integration_arrow.sh
@@ -23,8 +23,8 @@ arrow_dir=${1}
gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration
pip install -e $arrow_dir/dev/archery[integration]
-# For C# C Data Interface testing
-pip install pythonnet
+# For C Data Interface testing
+pip install jpype1 pythonnet
# Get more detailed context on crashes
export PYTHONFAULTHANDLER=1
diff --git a/dev/archery/archery/integration/cdata.py
b/dev/archery/archery/integration/cdata.py
index c201f5f867..8e5550fcdb 100644
--- a/dev/archery/archery/integration/cdata.py
+++ b/dev/archery/archery/integration/cdata.py
@@ -80,6 +80,15 @@ def ffi() -> cffi.FFI:
return ffi
+def _release_memory_steps(exporter: CDataExporter, importer: CDataImporter):
+ yield
+ for i in range(max(exporter.required_gc_runs, importer.required_gc_runs)):
+ importer.run_gc()
+ yield
+ exporter.run_gc()
+ yield
+
+
@contextmanager
def check_memory_released(exporter: CDataExporter, importer: CDataImporter):
"""
@@ -96,12 +105,13 @@ def check_memory_released(exporter: CDataExporter,
importer: CDataImporter):
if do_check:
before = exporter.record_allocation_state()
yield
- # We don't use a `finally` clause: if the enclosed block raised an
- # exception, no need to add another one.
+ # Only check for memory state if `yield` didn't raise.
if do_check:
- ok = exporter.compare_allocation_state(before, importer.gc_until)
- if not ok:
+ for _ in _release_memory_steps(exporter, importer):
after = exporter.record_allocation_state()
+ if after == before:
+ break
+ if after != before:
raise RuntimeError(
f"Memory was not released correctly after roundtrip: "
f"before = {before}, after = {after} (should have been equal)")
diff --git a/dev/archery/archery/integration/datagen.py
b/dev/archery/archery/integration/datagen.py
index f229012366..7635cfd98f 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -1722,7 +1722,6 @@ def generate_dictionary_unsigned_case():
# TODO: JavaScript does not support uint64 dictionary indices, so disabled
# for now
-
# dict3 = Dictionary(3, StringField('dictionary3'), size=5, name='DICT3')
fields = [
DictionaryField('f0', get_field('', 'uint8'), dict0),
diff --git a/dev/archery/archery/integration/runner.py
b/dev/archery/archery/integration/runner.py
index eb2e26951c..841633f94c 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -421,17 +421,18 @@ class IntegrationRunner(object):
# Serial execution is required for proper memory accounting
serial = True
- exporter = producer.make_c_data_exporter()
- importer = consumer.make_c_data_importer()
-
- case_runner = partial(self._run_c_schema_test_case, producer, consumer,
- exporter, importer)
- self._run_test_cases(case_runner, self.json_files, serial=serial)
-
- if producer.C_DATA_ARRAY_EXPORTER and consumer.C_DATA_ARRAY_IMPORTER:
- case_runner = partial(self._run_c_array_test_cases, producer,
consumer,
- exporter, importer)
- self._run_test_cases(case_runner, self.json_files, serial=serial)
+ with producer.make_c_data_exporter() as exporter:
+ with consumer.make_c_data_importer() as importer:
+ case_runner = partial(self._run_c_schema_test_case,
+ producer, consumer,
+ exporter, importer)
+ self._run_test_cases(case_runner, self.json_files,
serial=serial)
+
+ if producer.C_DATA_ARRAY_EXPORTER and
consumer.C_DATA_ARRAY_IMPORTER:
+ case_runner = partial(self._run_c_array_test_cases,
+ producer, consumer,
+ exporter, importer)
+ self._run_test_cases(case_runner, self.json_files,
serial=serial)
def _run_c_schema_test_case(self,
producer: Tester, consumer: Tester,
diff --git a/dev/archery/archery/integration/tester.py
b/dev/archery/archery/integration/tester.py
index 6cde20e61b..eadb953a61 100644
--- a/dev/archery/archery/integration/tester.py
+++ b/dev/archery/archery/integration/tester.py
@@ -68,52 +68,52 @@ class CDataExporter(ABC):
Whether the implementation is able to release memory deterministically.
Here, "release memory" means that, after the `release` callback of
- a C Data Interface export is called, `compare_allocation_state` is
- able to trigger the deallocation of the memory underlying the export
- (for example buffer data).
+ a C Data Interface export is called, `run_gc` is able to trigger
+ the deallocation of the memory underlying the export (such as buffer
data).
- If false, then `record_allocation_state` and `compare_allocation_state`
- are allowed to raise NotImplementedError.
+ If false, then `record_allocation_state` is allowed to raise
+ NotImplementedError.
"""
def record_allocation_state(self) -> object:
"""
- Record the current memory allocation state.
+ Return the current memory allocation state.
Returns
-------
state : object
- Opaque object representing the allocation state,
- for example the number of allocated bytes.
+ Equality-comparable object representing the allocation state,
+ for example the number of allocated or exported bytes.
"""
raise NotImplementedError
- def compare_allocation_state(self, recorded: object,
- gc_until: typing.Callable[[_Predicate], bool]
- ) -> bool:
+ def run_gc(self):
"""
- Compare the current memory allocation state with the recorded one.
+ Run the GC if necessary.
- Parameters
- ----------
- recorded : object
- The previous allocation state returned by
- `record_allocation_state()`
- gc_until : callable
- A callable itself accepting a callable predicate, and
- returning a boolean.
- `gc_until` should try to release memory until the predicate
- becomes true, or until it decides to give up. The final value
- of the predicate should be returned.
- `gc_until` is typically provided by the C Data Interface importer.
+ This should ensure that any temporary objects and data created by
+ previous exporter calls are collected.
+ """
- Returns
- -------
- success : bool
- Whether memory allocation state finally reached its previously
- recorded value.
+ @property
+ def required_gc_runs(self):
"""
- raise NotImplementedError
+ The maximum number of calls to `run_gc` that need to be issued to
+ ensure proper deallocation. Some implementations may require this
+ to be greater than one.
+ """
+ return 1
+
+ def close(self):
+ """
+ Final cleanup after usage.
+ """
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *exc):
+ self.close()
class CDataImporter(ABC):
@@ -163,32 +163,40 @@ class CDataImporter(ABC):
"""
Whether the implementation is able to release memory deterministically.
- Here, "release memory" means calling the `release` callback of
- a C Data Interface export (which should then trigger a deallocation
- mechanism on the exporter).
+ Here, "release memory" means `run_gc()` is able to trigger the
+ `release` callback of a C Data Interface export (which would then
+ induce a deallocation mechanism on the exporter).
+ """
- If false, then `gc_until` is allowed to raise NotImplementedError.
+ def run_gc(self):
"""
+ Run the GC if necessary.
- def gc_until(self, predicate: _Predicate):
+ This should ensure that any imported data has its release callback
called.
"""
- Try to release memory until the predicate becomes true, or fail.
- Depending on the CDataImporter implementation, this may for example
- try once, or run a garbage collector a given number of times, or
- any other implementation-specific strategy for releasing memory.
+ @property
+ def required_gc_runs(self):
+ """
+ The maximum number of calls to `run_gc` that need to be issued to
+ ensure release callbacks are triggered. Some implementations may
+ require this to be greater than one.
+ """
+ return 1
- The running time should be kept reasonable and compatible with
- execution of multiple C Data integration tests.
+ def close(self):
+ """
+ Final cleanup after usage.
+ """
- This should not raise if `supports_releasing_memory` is true.
+ def __enter__(self):
+ return self
- Returns
- -------
- success : bool
- The final value of the predicate.
- """
- raise NotImplementedError
+ def __exit__(self, *exc):
+ # Make sure any exported data is released.
+ for i in range(self.required_gc_runs):
+ self.run_gc()
+ self.close()
class Tester:
diff --git a/dev/archery/archery/integration/tester_cpp.py
b/dev/archery/archery/integration/tester_cpp.py
index 866fc225d2..658e713301 100644
--- a/dev/archery/archery/integration/tester_cpp.py
+++ b/dev/archery/archery/integration/tester_cpp.py
@@ -223,13 +223,6 @@ class CppCDataExporter(CDataExporter, _CDataBase):
def record_allocation_state(self):
return self.dll.ArrowCpp_BytesAllocated()
- def compare_allocation_state(self, recorded, gc_until):
- def pred():
- # No GC on our side, so just compare allocation state
- return self.record_allocation_state() == recorded
-
- return gc_until(pred)
-
class CppCDataImporter(CDataImporter, _CDataBase):
@@ -247,7 +240,3 @@ class CppCDataImporter(CDataImporter, _CDataBase):
@property
def supports_releasing_memory(self):
return True
-
- def gc_until(self, predicate):
- # No GC on our side, so can evaluate predicate immediately
- return predicate()
diff --git a/dev/archery/archery/integration/tester_csharp.py
b/dev/archery/archery/integration/tester_csharp.py
index 83b07495f9..7dca525673 100644
--- a/dev/archery/archery/integration/tester_csharp.py
+++ b/dev/archery/archery/integration/tester_csharp.py
@@ -16,7 +16,6 @@
# under the License.
from contextlib import contextmanager
-import gc
import os
from . import cdata
@@ -82,6 +81,10 @@ class _CDataBase:
schema = jf.Schema.ToArrow()
return schema, jf.Batches[num_batch].ToArrow(schema)
+ def _run_gc(self):
+ from Apache.Arrow.IntegrationTest import CDataInterface
+ CDataInterface.RunGC()
+
class CSharpCDataExporter(CDataExporter, _CDataBase):
@@ -105,6 +108,9 @@ class CSharpCDataExporter(CDataExporter, _CDataBase):
# XXX the C# GC doesn't give reliable allocation measurements
return False
+ def run_gc(self):
+ self._run_gc()
+
class CSharpCDataImporter(CDataImporter, _CDataBase):
@@ -134,15 +140,8 @@ class CSharpCDataImporter(CDataImporter, _CDataBase):
def supports_releasing_memory(self):
return True
- def gc_until(self, predicate):
- from Apache.Arrow.IntegrationTest import CDataInterface
- for i in range(3):
- if predicate():
- return True
- # Collect any C# objects hanging around through Python
- gc.collect()
- CDataInterface.RunGC()
- return predicate()
+ def run_gc(self):
+ self._run_gc()
class CSharpTester(Tester):
diff --git a/dev/archery/archery/integration/tester_go.py
b/dev/archery/archery/integration/tester_go.py
index b7af233f5d..2b3dc3a1be 100644
--- a/dev/archery/archery/integration/tester_go.py
+++ b/dev/archery/archery/integration/tester_go.py
@@ -200,9 +200,6 @@ class _CDataBase:
finally:
self.dll.ArrowGo_FreeError(go_error)
- def _run_gc(self):
- self.dll.ArrowGo_RunGC()
-
class GoCDataExporter(CDataExporter, _CDataBase):
# Note: the Arrow Go C Data export functions expect their output
@@ -225,14 +222,10 @@ class GoCDataExporter(CDataExporter, _CDataBase):
return True
def record_allocation_state(self):
- self._run_gc()
return self.dll.ArrowGo_BytesAllocated()
- def compare_allocation_state(self, recorded, gc_until):
- def pred():
- return self.record_allocation_state() == recorded
-
- return gc_until(pred)
+ # Note: no need to call the Go GC anywhere thanks to Arrow Go's
+ # explicit refcounting.
class GoCDataImporter(CDataImporter, _CDataBase):
@@ -252,10 +245,3 @@ class GoCDataImporter(CDataImporter, _CDataBase):
@property
def supports_releasing_memory(self):
return True
-
- def gc_until(self, predicate):
- for i in range(10):
- if predicate():
- return True
- self._run_gc()
- return False
diff --git a/dev/archery/archery/integration/tester_java.py
b/dev/archery/archery/integration/tester_java.py
index 45855079eb..5684798d79 100644
--- a/dev/archery/archery/integration/tester_java.py
+++ b/dev/archery/archery/integration/tester_java.py
@@ -16,10 +16,12 @@
# under the License.
import contextlib
+import functools
import os
import subprocess
-from .tester import Tester
+from . import cdata
+from .tester import Tester, CDataExporter, CDataImporter
from .util import run_cmd, log
from ..utils.source import ARROW_ROOT_DEFAULT
@@ -32,6 +34,8 @@ def load_version_from_pom():
return version_tag.text
+# XXX Should we add "-Darrow.memory.debug.allocator=true"? It adds a couple
+# minutes to total CPU usage of the integration test suite.
_JAVA_OPTS = [
"-Dio.netty.tryReflectionSetAccessible=true",
"-Darrow.struct.conflict.policy=CONFLICT_APPEND",
@@ -42,18 +46,25 @@ _ARROW_TOOLS_JAR = os.environ.get(
"ARROW_JAVA_INTEGRATION_JAR",
os.path.join(
ARROW_ROOT_DEFAULT,
- "java/tools/target/arrow-tools-{}-"
- "jar-with-dependencies.jar".format(_arrow_version),
- ),
+ "java/tools/target",
+ f"arrow-tools-{_arrow_version}-jar-with-dependencies.jar"
+ )
+)
+_ARROW_C_DATA_JAR = os.environ.get(
+ "ARROW_C_DATA_JAVA_INTEGRATION_JAR",
+ os.path.join(
+ ARROW_ROOT_DEFAULT,
+ "java/c/target",
+ f"arrow-c-data-{_arrow_version}.jar"
+ )
)
_ARROW_FLIGHT_JAR = os.environ.get(
"ARROW_FLIGHT_JAVA_INTEGRATION_JAR",
os.path.join(
ARROW_ROOT_DEFAULT,
- "java/flight/flight-integration-tests/target/"
- "flight-integration-tests-{}-jar-with-dependencies.jar".format(
- _arrow_version),
- ),
+ "java/flight/flight-integration-tests/target",
+ f"flight-integration-tests-{_arrow_version}-jar-with-dependencies.jar"
+ )
)
_ARROW_FLIGHT_SERVER = (
"org.apache.arrow.flight.integration.tests.IntegrationTestServer"
@@ -63,11 +74,155 @@ _ARROW_FLIGHT_CLIENT = (
)
[email protected]_cache
+def setup_jpype():
+ import jpype
+ jar_path = f"{_ARROW_TOOLS_JAR}:{_ARROW_C_DATA_JAR}"
+ # XXX Didn't manage to tone down the logging level here (DEBUG -> INFO)
+ jpype.startJVM(jpype.getDefaultJVMPath(),
+ "-Djava.class.path=" + jar_path, *_JAVA_OPTS)
+
+
+class _CDataBase:
+
+ def __init__(self, debug, args):
+ import jpype
+ self.debug = debug
+ self.args = args
+ self.ffi = cdata.ffi()
+ setup_jpype()
+ # JPype pointers to java.io, org.apache.arrow...
+ self.java_io = jpype.JPackage("java").io
+ self.java_arrow = jpype.JPackage("org").apache.arrow
+ self.java_allocator = self._make_java_allocator()
+
+ def _pointer_to_int(self, c_ptr):
+ return int(self.ffi.cast('uintptr_t', c_ptr))
+
+ def _wrap_c_schema_ptr(self, c_schema_ptr):
+ return self.java_arrow.c.ArrowSchema.wrap(
+ self._pointer_to_int(c_schema_ptr))
+
+ def _wrap_c_array_ptr(self, c_array_ptr):
+ return self.java_arrow.c.ArrowArray.wrap(
+ self._pointer_to_int(c_array_ptr))
+
+ def _make_java_allocator(self):
+ # Return a new allocator
+ return self.java_arrow.memory.RootAllocator()
+
+ def _assert_schemas_equal(self, expected, actual):
+ # XXX This is fragile for dictionaries, as Schema.equals compares
+ # dictionary ids.
+ self.java_arrow.vector.util.Validator.compareSchemas(
+ expected, actual)
+
+ def _assert_batches_equal(self, expected, actual):
+ self.java_arrow.vector.util.Validator.compareVectorSchemaRoot(
+ expected, actual)
+
+ def _assert_dict_providers_equal(self, expected, actual):
+ self.java_arrow.vector.util.Validator.compareDictionaryProviders(
+ expected, actual)
+
+ # Note: no need to call the Java GC anywhere thanks to AutoCloseable
+
+
+class JavaCDataExporter(CDataExporter, _CDataBase):
+
+ def export_schema_from_json(self, json_path, c_schema_ptr):
+ json_file = self.java_io.File(json_path)
+ with self.java_arrow.vector.ipc.JsonFileReader(
+ json_file, self.java_allocator) as json_reader:
+ schema = json_reader.start()
+ dict_provider = json_reader
+ self.java_arrow.c.Data.exportSchema(
+ self.java_allocator, schema, dict_provider,
+ self._wrap_c_schema_ptr(c_schema_ptr)
+ )
+
+ def export_batch_from_json(self, json_path, num_batch, c_array_ptr):
+ json_file = self.java_io.File(json_path)
+ with self.java_arrow.vector.ipc.JsonFileReader(
+ json_file, self.java_allocator) as json_reader:
+ json_reader.start()
+ if num_batch > 0:
+ actually_skipped = json_reader.skip(num_batch)
+ assert actually_skipped == num_batch
+ with json_reader.read() as batch:
+ dict_provider = json_reader
+ self.java_arrow.c.Data.exportVectorSchemaRoot(
+ self.java_allocator, batch, dict_provider,
+ self._wrap_c_array_ptr(c_array_ptr))
+
+ @property
+ def supports_releasing_memory(self):
+ return True
+
+ def record_allocation_state(self):
+ return self.java_allocator.getAllocatedMemory()
+
+ def close(self):
+ self.java_allocator.close()
+
+
+class JavaCDataImporter(CDataImporter, _CDataBase):
+
+ def import_schema_and_compare_to_json(self, json_path, c_schema_ptr):
+ json_file = self.java_io.File(json_path)
+ with self.java_arrow.vector.ipc.JsonFileReader(
+ json_file, self.java_allocator) as json_reader:
+ json_schema = json_reader.start()
+ with self.java_arrow.c.CDataDictionaryProvider() as dict_provider:
+ imported_schema = self.java_arrow.c.Data.importSchema(
+ self.java_allocator,
+ self._wrap_c_schema_ptr(c_schema_ptr),
+ dict_provider)
+ self._assert_schemas_equal(json_schema, imported_schema)
+
+ def import_batch_and_compare_to_json(self, json_path, num_batch,
+ c_array_ptr):
+ json_file = self.java_io.File(json_path)
+ with self.java_arrow.vector.ipc.JsonFileReader(
+ json_file, self.java_allocator) as json_reader:
+ schema = json_reader.start()
+ if num_batch > 0:
+ actually_skipped = json_reader.skip(num_batch)
+ assert actually_skipped == num_batch
+ with json_reader.read() as batch:
+ with self.java_arrow.vector.VectorSchemaRoot.create(
+ schema, self.java_allocator) as imported_batch:
+ # We need to pass a dict provider primed with dictionary
ids
+ # matching those in the schema, hence an empty
+ # CDataDictionaryProvider would not work here.
+ dict_provider = (self.java_arrow.vector.dictionary
+
.DictionaryProvider.MapDictionaryProvider())
+ dict_provider.copyStructureFrom(json_reader,
self.java_allocator)
+ with dict_provider:
+ self.java_arrow.c.Data.importIntoVectorSchemaRoot(
+ self.java_allocator,
+ self._wrap_c_array_ptr(c_array_ptr),
+ imported_batch, dict_provider)
+ self._assert_batches_equal(batch, imported_batch)
+ self._assert_dict_providers_equal(json_reader,
dict_provider)
+
+ @property
+ def supports_releasing_memory(self):
+ return True
+
+ def close(self):
+ self.java_allocator.close()
+
+
class JavaTester(Tester):
PRODUCER = True
CONSUMER = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True
+ C_DATA_SCHEMA_EXPORTER = True
+ C_DATA_SCHEMA_IMPORTER = True
+ C_DATA_ARRAY_EXPORTER = True
+ C_DATA_ARRAY_IMPORTER = True
name = 'Java'
@@ -186,3 +341,9 @@ class JavaTester(Tester):
finally:
server.kill()
server.wait(5)
+
+ def make_c_data_exporter(self):
+ return JavaCDataExporter(self.debug, self.args)
+
+ def make_c_data_importer(self):
+ return JavaCDataImporter(self.debug, self.args)
diff --git a/docker-compose.yml b/docker-compose.yml
index 0e5034346e..e54c609e54 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1730,16 +1730,21 @@ services:
volumes: *conda-volumes
environment:
<<: [*common, *ccache]
- # tell archery where the arrow binaries are located
+ ARCHERY_INTEGRATION_WITH_RUST: 0
+ # Tell Archery where the arrow C++ binaries are located
ARROW_CPP_EXE_PATH: /build/cpp/debug
ARROW_GO_INTEGRATION: 1
- ARCHERY_INTEGRATION_WITH_RUST: 0
+ ARROW_JAVA_CDATA: "ON"
+ JAVA_JNI_CMAKE_ARGS: >-
+ -DARROW_JAVA_JNI_ENABLE_DEFAULT=OFF
+ -DARROW_JAVA_JNI_ENABLE_C=ON
command:
["/arrow/ci/scripts/rust_build.sh /arrow /build &&
/arrow/ci/scripts/cpp_build.sh /arrow /build &&
/arrow/ci/scripts/csharp_build.sh /arrow /build &&
/arrow/ci/scripts/go_build.sh /arrow &&
- /arrow/ci/scripts/java_build.sh /arrow /build &&
+ /arrow/ci/scripts/java_jni_build.sh /arrow $${ARROW_HOME} /build
/tmp/dist/java/$$(arch) &&
+ /arrow/ci/scripts/java_build.sh /arrow /build /tmp/dist/java &&
/arrow/ci/scripts/js_build.sh /arrow /build &&
/arrow/ci/scripts/integration_arrow.sh /arrow /build"]
diff --git
a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java
b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java
index 7408bf7113..cd2a464f4f 100644
--- a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java
+++ b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java
@@ -165,9 +165,9 @@ class BufferImportTypeVisitor implements
ArrowType.ArrowTypeVisitor<List<ArrowBu
return Collections.singletonList(importFixedBytes(type, 0,
UnionVector.TYPE_WIDTH));
case Dense:
return Arrays.asList(importFixedBytes(type, 0,
DenseUnionVector.TYPE_WIDTH),
- importFixedBytes(type, 0, DenseUnionVector.OFFSET_WIDTH));
+ importFixedBytes(type, 1, DenseUnionVector.OFFSET_WIDTH));
default:
- throw new UnsupportedOperationException("Importing buffers for type: "
+ type);
+ throw new UnsupportedOperationException("Importing buffers for union
type: " + type);
}
}
diff --git a/java/c/src/main/java/org/apache/arrow/c/Format.java
b/java/c/src/main/java/org/apache/arrow/c/Format.java
index 315d3caad7..2875e46f74 100644
--- a/java/c/src/main/java/org/apache/arrow/c/Format.java
+++ b/java/c/src/main/java/org/apache/arrow/c/Format.java
@@ -138,6 +138,8 @@ final class Format {
return "tiD";
case YEAR_MONTH:
return "tiM";
+ case MONTH_DAY_NANO:
+ return "tin";
default:
throw new UnsupportedOperationException(
String.format("Interval type with unit %s is unsupported",
type.getUnit()));
@@ -277,6 +279,8 @@ final class Format {
return new ArrowType.Interval(IntervalUnit.YEAR_MONTH);
case "tiD":
return new ArrowType.Interval(IntervalUnit.DAY_TIME);
+ case "tin":
+ return new ArrowType.Interval(IntervalUnit.MONTH_DAY_NANO);
case "+l":
return new ArrowType.List();
case "+L":
diff --git a/java/c/src/main/java/org/apache/arrow/c/SchemaImporter.java
b/java/c/src/main/java/org/apache/arrow/c/SchemaImporter.java
index 21d88f6cd4..09a6afafa0 100644
--- a/java/c/src/main/java/org/apache/arrow/c/SchemaImporter.java
+++ b/java/c/src/main/java/org/apache/arrow/c/SchemaImporter.java
@@ -44,7 +44,7 @@ final class SchemaImporter {
private static final Logger logger =
LoggerFactory.getLogger(SchemaImporter.class);
private static final int MAX_IMPORT_RECURSION_LEVEL = 64;
- private long nextDictionaryID = 1L;
+ private long nextDictionaryID = 0L;
private final BufferAllocator allocator;
diff --git a/java/c/src/test/java/org/apache/arrow/c/DictionaryTest.java
b/java/c/src/test/java/org/apache/arrow/c/DictionaryTest.java
index 3f793f836d..9dcb262af4 100644
--- a/java/c/src/test/java/org/apache/arrow/c/DictionaryTest.java
+++ b/java/c/src/test/java/org/apache/arrow/c/DictionaryTest.java
@@ -100,7 +100,7 @@ public class DictionaryTest {
dictVector.setSafe(2, "cc".getBytes());
dictVector.setValueCount(3);
- Dictionary dictionary = new Dictionary(dictVector, new
DictionaryEncoding(1L, false, /* indexType= */null));
+ Dictionary dictionary = new Dictionary(dictVector, new
DictionaryEncoding(0L, false, /* indexType= */null));
provider.put(dictionary);
// create vector and encode it
@@ -169,7 +169,7 @@ public class DictionaryTest {
dictVector.setSafe(3, "dd".getBytes());
dictVector.setSafe(4, "ee".getBytes());
dictVector.setValueCount(5);
- Dictionary dictionary = new Dictionary(dictVector, new
DictionaryEncoding(1L, false, /* indexType= */null));
+ Dictionary dictionary = new Dictionary(dictVector, new
DictionaryEncoding(0L, false, /* indexType= */null));
provider.put(dictionary);
Schema schema = new Schema(Collections.singletonList(vector.getField()));
diff --git a/java/c/src/test/java/org/apache/arrow/c/StreamTest.java
b/java/c/src/test/java/org/apache/arrow/c/StreamTest.java
index 06401687a5..68d4fc2a81 100644
--- a/java/c/src/test/java/org/apache/arrow/c/StreamTest.java
+++ b/java/c/src/test/java/org/apache/arrow/c/StreamTest.java
@@ -135,7 +135,7 @@ final class StreamTest {
@Test
public void roundtripDictionary() throws Exception {
final ArrowType.Int indexType = new ArrowType.Int(32, true);
- final DictionaryEncoding encoding = new DictionaryEncoding(1L, false,
indexType);
+ final DictionaryEncoding encoding = new DictionaryEncoding(0L, false,
indexType);
final Schema schema = new Schema(Collections.singletonList(
new Field("dict", new FieldType(/*nullable=*/true, indexType,
encoding), Collections.emptyList())));
final List<ArrowRecordBatch> batches = new ArrayList<>();
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/NullVector.java
b/java/vector/src/main/java/org/apache/arrow/vector/NullVector.java
index 6e4c2764bd..1badf4b4ca 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/NullVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/NullVector.java
@@ -192,6 +192,7 @@ public class NullVector implements FieldVector {
@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf>
ownBuffers) {
Preconditions.checkArgument(ownBuffers.isEmpty(), "Null vector has no
buffers");
+ valueCount = fieldNode.getLength();
}
@Override
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java
b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java
index 698ddac466..5323ddda83 100644
---
a/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java
+++
b/java/vector/src/main/java/org/apache/arrow/vector/compare/RangeEqualsVisitor.java
@@ -121,9 +121,11 @@ public class RangeEqualsVisitor implements
VectorVisitor<Boolean, Range> {
"rightStart %s must be non negative.", range.getRightStart());
Preconditions.checkArgument(range.getRightStart() + range.getLength() <=
right.getValueCount(),
- "(rightStart + length) %s out of range[0, %s].", 0,
right.getValueCount());
+ "(rightStart + length) %s out of range[0, %s].",
+ range.getRightStart() + range.getLength(), right.getValueCount());
Preconditions.checkArgument(range.getLeftStart() + range.getLength() <=
left.getValueCount(),
- "(leftStart + length) %s out of range[0, %s].", 0,
left.getValueCount());
+ "(leftStart + length) %s out of range[0, %s].",
+ range.getLeftStart() + range.getLength(), left.getValueCount());
return left.accept(this, range);
}
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
index 76e1eb9f66..f64c32be0f 100644
---
a/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
+++
b/java/vector/src/main/java/org/apache/arrow/vector/dictionary/DictionaryProvider.java
@@ -21,6 +21,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.VisibleForTesting;
+
/**
* A manager for association of dictionary IDs to their corresponding {@link
Dictionary}.
*/
@@ -35,7 +38,7 @@ public interface DictionaryProvider {
/**
* Implementation of {@link DictionaryProvider} that is backed by a hash-map.
*/
- class MapDictionaryProvider implements DictionaryProvider {
+ class MapDictionaryProvider implements AutoCloseable, DictionaryProvider {
private final Map<Long, Dictionary> map;
@@ -49,6 +52,23 @@ public interface DictionaryProvider {
}
}
+ /**
+ * Initialize the map structure from another provider, but with empty
vectors.
+ *
+ * @param other the {@link DictionaryProvider} to copy the ids and fields
from
+ * @param allocator allocator to create the empty vectors
+ */
+ // This is currently called using JPype by the integration tests.
+ @VisibleForTesting
+ public void copyStructureFrom(DictionaryProvider other, BufferAllocator
allocator) {
+ for (Long id : other.getDictionaryIds()) {
+ Dictionary otherDict = other.lookup(id);
+ Dictionary newDict = new
Dictionary(otherDict.getVector().getField().createVector(allocator),
+ otherDict.getEncoding());
+ put(newDict);
+ }
+ }
+
public void put(Dictionary dictionary) {
map.put(dictionary.getEncoding().getId(), dictionary);
}
@@ -62,5 +82,12 @@ public interface DictionaryProvider {
public Dictionary lookup(long id) {
return map.get(id);
}
+
+ @Override
+ public void close() {
+ for (Dictionary dictionary : map.values()) {
+ dictionary.getVector().close();
+ }
+ }
}
}
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
index 742daeef25..0c23a664f6 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
@@ -237,6 +237,28 @@ public class JsonFileReader implements AutoCloseable,
DictionaryProvider {
}
}
+ /**
+ * Skips a number of record batches in the file.
+ *
+ * @param numBatches the number of batches to skip
+ * @return the actual number of skipped batches.
+ */
+ // This is currently called using JPype by the integration tests.
+ public int skip(int numBatches) throws IOException {
+ for (int i = 0; i < numBatches; ++i) {
+ JsonToken t = parser.nextToken();
+ if (t == START_OBJECT) {
+ parser.skipChildren();
+ assert parser.getCurrentToken() == END_OBJECT;
+ } else if (t == END_ARRAY) {
+ return i;
+ } else {
+ throw new IllegalArgumentException("Invalid token: " + t);
+ }
+ }
+ return numBatches;
+ }
+
private abstract class BufferReader {
protected abstract ArrowBuf read(BufferAllocator allocator, int count)
throws IOException;
@@ -692,7 +714,8 @@ public class JsonFileReader implements AutoCloseable,
DictionaryProvider {
}
private void readFromJsonIntoVector(Field field, FieldVector vector) throws
JsonParseException, IOException {
- TypeLayout typeLayout = TypeLayout.getTypeLayout(field.getType());
+ ArrowType type = field.getType();
+ TypeLayout typeLayout = TypeLayout.getTypeLayout(type);
List<BufferType> vectorTypes = typeLayout.getBufferTypes();
ArrowBuf[] vectorBuffers = new ArrowBuf[vectorTypes.size()];
/*
@@ -728,21 +751,18 @@ public class JsonFileReader implements AutoCloseable,
DictionaryProvider {
BufferType bufferType = vectorTypes.get(v);
nextFieldIs(bufferType.getName());
int innerBufferValueCount = valueCount;
- if (bufferType.equals(OFFSET) &&
!field.getType().getTypeID().equals(ArrowType.ArrowTypeID.Union)) {
- /* offset buffer has 1 additional value capacity */
+ if (bufferType.equals(OFFSET) && !(type instanceof ArrowType.Union)) {
+ /* offset buffer has 1 additional value capacity except for dense
unions */
innerBufferValueCount = valueCount + 1;
}
vectorBuffers[v] = readIntoBuffer(allocator, bufferType,
vector.getMinorType(), innerBufferValueCount);
}
- if (vectorBuffers.length == 0) {
- readToken(END_OBJECT);
- return;
- }
-
int nullCount = 0;
- if (!(vector.getField().getFieldType().getType() instanceof
ArrowType.Union)) {
+ if (type instanceof ArrowType.Null) {
+ nullCount = valueCount;
+ } else if (!(type instanceof ArrowType.Union)) {
nullCount = BitVectorHelper.getNullCount(vectorBuffers[0], valueCount);
}
final ArrowFieldNode fieldNode = new ArrowFieldNode(valueCount,
nullCount);
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
index 83a8ece0bf..f81d049a92 100644
---
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
+++
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
@@ -112,8 +112,8 @@ public class ArrowRecordBatch implements ArrowMessage {
}
long size = arrowBuf.readableBytes();
arrowBuffers.add(new ArrowBuffer(offset, size));
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Buffer in RecordBatch at {}, length: {}", offset, size);
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Buffer in RecordBatch at {}, length: {}", offset, size);
}
offset += size;
if (alignBuffers) { // align on 8 byte boundaries
diff --git
a/java/vector/src/main/java/org/apache/arrow/vector/util/Validator.java
b/java/vector/src/main/java/org/apache/arrow/vector/util/Validator.java
index 741972b4ad..0c9ad1e275 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/util/Validator.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/util/Validator.java
@@ -17,6 +17,7 @@
package org.apache.arrow.vector.util;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
@@ -85,6 +86,31 @@ public class Validator {
}
}
+ /**
+ * Validate two dictionary providers are equal in structure and contents.
+ */
+ public static void compareDictionaryProviders(
+ DictionaryProvider provider1,
+ DictionaryProvider provider2) {
+ List<Long> ids1 = new ArrayList(provider1.getDictionaryIds());
+ List<Long> ids2 = new ArrayList(provider2.getDictionaryIds());
+ java.util.Collections.sort(ids1);
+ java.util.Collections.sort(ids2);
+ if (!ids1.equals(ids2)) {
+ throw new IllegalArgumentException("Different ids in dictionary
providers:\n" +
+ ids1 + "\n" + ids2);
+ }
+ for (long id : ids1) {
+ Dictionary dict1 = provider1.lookup(id);
+ Dictionary dict2 = provider2.lookup(id);
+ try {
+ compareFieldVectors(dict1.getVector(), dict2.getVector());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Different dictionaries:\n" + dict1
+ "\n" + dict2, e);
+ }
+ }
+ }
+
/**
* Validate two arrow vectorSchemaRoot are equal.
*