lidavidm commented on code in PR #35603:
URL: https://github.com/apache/arrow/pull/35603#discussion_r1258281240
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java:
##########
@@ -110,6 +110,7 @@ private static ArrowFlightSqlClientHandler
createNewClientHandler(
} catch (final SQLException e) {
try {
allocator.close();
+ allocator.getChildAllocators().forEach(BufferAllocator::close);
Review Comment:
1) Can we use AutoCloseables?
2) This needs to come before the root allocator is closed anyways
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -204,6 +259,21 @@ public boolean next() throws SQLException {
}
}
+ private void cleanUpResources() throws Exception {
+ if (flightStreamQueue != null) {
+ // flightStreamQueue should close currentFlightStream internally
+ flightStreamQueue.close();
+ } else if (currentFlightStream != null) {
+ // close is only called for currentFlightStream if there's no queue
+ currentFlightStream.close();
+ }
+
+ List<VectorSchemaRoot> roots = new LinkedList<>();
+ vectorSchemaRoots.drainTo(roots);
+ roots.forEach(AutoCloseables::closeNoChecked);
+ ofNullable(currentRoot).ifPresent(AutoCloseables::closeNoChecked);
Review Comment:
It would be preferable to do this all as one big AutoCloseables call if
possible (also, is LinkedList strictly necessary here vs ArrayList?)
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java:
##########
@@ -92,18 +93,11 @@ protected AvaticaResultSet execute() throws SQLException {
throw new RuntimeException("Can only execute with
execute(VectorSchemaRoot)");
}
- void execute(final VectorSchemaRoot vectorSchemaRoot) {
- final List<Field> fields = vectorSchemaRoot.getSchema().getFields();
- final List<ColumnMetaData> columns =
ConvertUtils.convertArrowFieldsToColumnMetaDataList(fields);
- signature.columns.clear();
- signature.columns.addAll(columns);
-
- this.vectorSchemaRoot = vectorSchemaRoot;
- execute2(new ArrowFlightJdbcCursor(vectorSchemaRoot),
this.signature.columns);
- }
-
void execute(final VectorSchemaRoot vectorSchemaRoot, final Schema schema) {
- final List<ColumnMetaData> columns =
ConvertUtils.convertArrowFieldsToColumnMetaDataList(schema.getFields());
+ final List<Field> fields = Optional.ofNullable(schema)
Review Comment:
nit: can't this just be a ternary? There's no reason to box/unbox an
Optional here
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -45,15 +54,18 @@
*/
public final class ArrowFlightJdbcFlightStreamResultSet
extends ArrowFlightJdbcVectorSchemaRootResultSet {
-
+ private static final String BLOCKING_QUEUE_PARAM = "buffersize";
Review Comment:
Can we add this to the documentation?
Can we add a test of setting this parameter?
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -130,39 +146,89 @@ protected AvaticaResultSet execute() throws SQLException {
}
private void execute(final FlightInfo flightInfo) throws SQLException {
- loadNewQueue();
+ // load new FlightStreamQueue
+ ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
+ flightStreamQueue = createNewQueue(connection.getExecutorService());
+
+ // load new FlightStream
flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
- loadNewFlightStream();
+ ofNullable(currentFlightStream).ifPresent(AutoCloseables::closeNoChecked);
+ currentFlightStream = getNextFlightStream(true);
// Ownership of the root will be passed onto the cursor.
if (currentFlightStream != null) {
- executeForCurrentFlightStream();
+ storeRootsFromStreamAsync();
+ executeNextRoot();
+ }
+ }
+
+ private BufferAllocator getAllocator() {
+ if (allocator == null) {
+ allocator =
connection.getBufferAllocator().newChildAllocator("vsr-copier", 0,
Long.MAX_VALUE);
}
+
+ return allocator;
}
- private void executeForCurrentFlightStream() throws SQLException {
- final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
+ private VectorSchemaRoot cloneRoot(VectorSchemaRoot originalRoot) {
+ VectorSchemaRoot theRoot =
VectorSchemaRoot.create(originalRoot.getSchema(), getAllocator());
+ VectorLoader loader = new VectorLoader(theRoot);
+ VectorUnloader unloader = new VectorUnloader(originalRoot);
+ try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
+ loader.load(recordBatch);
+ }
+ return theRoot;
+ }
+ private void storeRoot(VectorSchemaRoot originalRoot) throws SQLException {
+ VectorSchemaRoot theRoot = cloneRoot(originalRoot);
+ VectorSchemaRoot transformedRoot = null;
if (transformer != null) {
try {
- currentVectorSchemaRoot = transformer.transform(originalRoot,
currentVectorSchemaRoot);
+ transformedRoot = transformer.transform(theRoot, null);
+ theRoot.close();
} catch (final Exception e) {
throw new SQLException("Failed to transform VectorSchemaRoot.", e);
}
- } else {
- currentVectorSchemaRoot = originalRoot;
}
- if (schema != null) {
- execute(currentVectorSchemaRoot, schema);
- } else {
- execute(currentVectorSchemaRoot);
+ try {
+ vectorSchemaRoots.put(ofNullable(transformedRoot).orElse(theRoot));
+ } catch (InterruptedException e) {
+ throw new SQLException("Could not put root to the queue", e);
}
}
+ private void executeNextRoot() throws SQLException {
+ try {
+ ofNullable(currentRoot).ifPresent(AutoCloseables::closeNoChecked);
+ currentRoot = vectorSchemaRoots.poll(10, TimeUnit.SECONDS);
+ execute(currentRoot, schema);
+ } catch (InterruptedException e) {
+ throw new SQLException("Could not take root from the queue", e);
+ }
+ }
+
+ private void storeRootsFromStreamAsync() {
+ CompletableFuture.runAsync(() -> {
+ while (vectorSchemaRoots.remainingCapacity() > 0) {
Review Comment:
multiple calls to `next` might spawn this task multiple times, and then they
may push too many items onto the queue, right?
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -130,39 +146,89 @@ protected AvaticaResultSet execute() throws SQLException {
}
private void execute(final FlightInfo flightInfo) throws SQLException {
- loadNewQueue();
+ // load new FlightStreamQueue
+ ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
+ flightStreamQueue = createNewQueue(connection.getExecutorService());
+
+ // load new FlightStream
flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
- loadNewFlightStream();
+ ofNullable(currentFlightStream).ifPresent(AutoCloseables::closeNoChecked);
+ currentFlightStream = getNextFlightStream(true);
// Ownership of the root will be passed onto the cursor.
if (currentFlightStream != null) {
- executeForCurrentFlightStream();
+ storeRootsFromStreamAsync();
+ executeNextRoot();
+ }
+ }
+
+ private BufferAllocator getAllocator() {
+ if (allocator == null) {
+ allocator =
connection.getBufferAllocator().newChildAllocator("vsr-copier", 0,
Long.MAX_VALUE);
}
+
+ return allocator;
}
- private void executeForCurrentFlightStream() throws SQLException {
- final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
+ private VectorSchemaRoot cloneRoot(VectorSchemaRoot originalRoot) {
+ VectorSchemaRoot theRoot =
VectorSchemaRoot.create(originalRoot.getSchema(), getAllocator());
+ VectorLoader loader = new VectorLoader(theRoot);
+ VectorUnloader unloader = new VectorUnloader(originalRoot);
+ try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
+ loader.load(recordBatch);
+ }
+ return theRoot;
+ }
+ private void storeRoot(VectorSchemaRoot originalRoot) throws SQLException {
+ VectorSchemaRoot theRoot = cloneRoot(originalRoot);
+ VectorSchemaRoot transformedRoot = null;
if (transformer != null) {
try {
- currentVectorSchemaRoot = transformer.transform(originalRoot,
currentVectorSchemaRoot);
+ transformedRoot = transformer.transform(theRoot, null);
+ theRoot.close();
} catch (final Exception e) {
throw new SQLException("Failed to transform VectorSchemaRoot.", e);
}
- } else {
- currentVectorSchemaRoot = originalRoot;
}
- if (schema != null) {
- execute(currentVectorSchemaRoot, schema);
- } else {
- execute(currentVectorSchemaRoot);
+ try {
+ vectorSchemaRoots.put(ofNullable(transformedRoot).orElse(theRoot));
+ } catch (InterruptedException e) {
+ throw new SQLException("Could not put root to the queue", e);
}
}
+ private void executeNextRoot() throws SQLException {
+ try {
+ ofNullable(currentRoot).ifPresent(AutoCloseables::closeNoChecked);
+ currentRoot = vectorSchemaRoots.poll(10, TimeUnit.SECONDS);
Review Comment:
Do we want to have a hard timeout like this? We don't know how long it might
take for the server to respond
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -130,39 +146,89 @@ protected AvaticaResultSet execute() throws SQLException {
}
private void execute(final FlightInfo flightInfo) throws SQLException {
- loadNewQueue();
+ // load new FlightStreamQueue
+ ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
+ flightStreamQueue = createNewQueue(connection.getExecutorService());
+
+ // load new FlightStream
flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
- loadNewFlightStream();
+ ofNullable(currentFlightStream).ifPresent(AutoCloseables::closeNoChecked);
+ currentFlightStream = getNextFlightStream(true);
// Ownership of the root will be passed onto the cursor.
if (currentFlightStream != null) {
- executeForCurrentFlightStream();
+ storeRootsFromStreamAsync();
+ executeNextRoot();
+ }
+ }
+
+ private BufferAllocator getAllocator() {
+ if (allocator == null) {
+ allocator =
connection.getBufferAllocator().newChildAllocator("vsr-copier", 0,
Long.MAX_VALUE);
}
+
+ return allocator;
Review Comment:
Can this also be just initialized in the constructor?
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -130,39 +146,89 @@ protected AvaticaResultSet execute() throws SQLException {
}
private void execute(final FlightInfo flightInfo) throws SQLException {
- loadNewQueue();
+ // load new FlightStreamQueue
+ ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
+ flightStreamQueue = createNewQueue(connection.getExecutorService());
+
+ // load new FlightStream
flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
- loadNewFlightStream();
+ ofNullable(currentFlightStream).ifPresent(AutoCloseables::closeNoChecked);
+ currentFlightStream = getNextFlightStream(true);
// Ownership of the root will be passed onto the cursor.
if (currentFlightStream != null) {
- executeForCurrentFlightStream();
+ storeRootsFromStreamAsync();
+ executeNextRoot();
+ }
+ }
+
+ private BufferAllocator getAllocator() {
+ if (allocator == null) {
+ allocator =
connection.getBufferAllocator().newChildAllocator("vsr-copier", 0,
Long.MAX_VALUE);
}
+
+ return allocator;
}
- private void executeForCurrentFlightStream() throws SQLException {
- final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
+ private VectorSchemaRoot cloneRoot(VectorSchemaRoot originalRoot) {
+ VectorSchemaRoot theRoot =
VectorSchemaRoot.create(originalRoot.getSchema(), getAllocator());
+ VectorLoader loader = new VectorLoader(theRoot);
+ VectorUnloader unloader = new VectorUnloader(originalRoot);
+ try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
+ loader.load(recordBatch);
+ }
+ return theRoot;
+ }
+ private void storeRoot(VectorSchemaRoot originalRoot) throws SQLException {
+ VectorSchemaRoot theRoot = cloneRoot(originalRoot);
+ VectorSchemaRoot transformedRoot = null;
if (transformer != null) {
try {
- currentVectorSchemaRoot = transformer.transform(originalRoot,
currentVectorSchemaRoot);
+ transformedRoot = transformer.transform(theRoot, null);
+ theRoot.close();
} catch (final Exception e) {
throw new SQLException("Failed to transform VectorSchemaRoot.", e);
}
- } else {
- currentVectorSchemaRoot = originalRoot;
}
- if (schema != null) {
- execute(currentVectorSchemaRoot, schema);
- } else {
- execute(currentVectorSchemaRoot);
+ try {
+ vectorSchemaRoots.put(ofNullable(transformedRoot).orElse(theRoot));
+ } catch (InterruptedException e) {
+ throw new SQLException("Could not put root to the queue", e);
}
}
+ private void executeNextRoot() throws SQLException {
+ try {
+ ofNullable(currentRoot).ifPresent(AutoCloseables::closeNoChecked);
+ currentRoot = vectorSchemaRoots.poll(10, TimeUnit.SECONDS);
+ execute(currentRoot, schema);
+ } catch (InterruptedException e) {
+ throw new SQLException("Could not take root from the queue", e);
+ }
+ }
+
+ private void storeRootsFromStreamAsync() {
+ CompletableFuture.runAsync(() -> {
+ while (vectorSchemaRoots.remainingCapacity() > 0) {
+ try {
+ currentFlightStream =
ofNullable(currentFlightStream).orElse(getNextFlightStream(false));
+ streamHasNext = currentFlightStream.next();
+ if (!streamHasNext) {
+ flightStreamQueue.enqueue(currentFlightStream);
+ }
+ storeRoot(currentFlightStream.getRoot());
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
@Override
public boolean next() throws SQLException {
Review Comment:
The control flow between here and the background task are hard to follow; it
would help to document what's going on
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -73,6 +86,7 @@ public final class ArrowFlightJdbcFlightStreamResultSet
final Meta.Frame firstFrame) throws
SQLException {
super(null, state, signature, resultSetMetaData, timeZone, firstFrame);
this.connection = connection;
+ initializeVectorSchemaRootsQueue();
}
Review Comment:
Would it be possible to have both constructors delegate to a single
constructor, then inline initializeVectorSchemaRootsQueue into that constructor?
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -130,39 +146,89 @@ protected AvaticaResultSet execute() throws SQLException {
}
private void execute(final FlightInfo flightInfo) throws SQLException {
- loadNewQueue();
+ // load new FlightStreamQueue
+ ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
+ flightStreamQueue = createNewQueue(connection.getExecutorService());
+
+ // load new FlightStream
flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
- loadNewFlightStream();
+ ofNullable(currentFlightStream).ifPresent(AutoCloseables::closeNoChecked);
+ currentFlightStream = getNextFlightStream(true);
// Ownership of the root will be passed onto the cursor.
if (currentFlightStream != null) {
- executeForCurrentFlightStream();
+ storeRootsFromStreamAsync();
+ executeNextRoot();
+ }
+ }
+
+ private BufferAllocator getAllocator() {
+ if (allocator == null) {
+ allocator =
connection.getBufferAllocator().newChildAllocator("vsr-copier", 0,
Long.MAX_VALUE);
}
+
+ return allocator;
}
- private void executeForCurrentFlightStream() throws SQLException {
- final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
+ private VectorSchemaRoot cloneRoot(VectorSchemaRoot originalRoot) {
+ VectorSchemaRoot theRoot =
VectorSchemaRoot.create(originalRoot.getSchema(), getAllocator());
+ VectorLoader loader = new VectorLoader(theRoot);
+ VectorUnloader unloader = new VectorUnloader(originalRoot);
+ try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
+ loader.load(recordBatch);
+ }
+ return theRoot;
+ }
+ private void storeRoot(VectorSchemaRoot originalRoot) throws SQLException {
+ VectorSchemaRoot theRoot = cloneRoot(originalRoot);
+ VectorSchemaRoot transformedRoot = null;
if (transformer != null) {
try {
- currentVectorSchemaRoot = transformer.transform(originalRoot,
currentVectorSchemaRoot);
+ transformedRoot = transformer.transform(theRoot, null);
+ theRoot.close();
} catch (final Exception e) {
throw new SQLException("Failed to transform VectorSchemaRoot.", e);
}
- } else {
- currentVectorSchemaRoot = originalRoot;
}
- if (schema != null) {
- execute(currentVectorSchemaRoot, schema);
- } else {
- execute(currentVectorSchemaRoot);
+ try {
+ vectorSchemaRoots.put(ofNullable(transformedRoot).orElse(theRoot));
+ } catch (InterruptedException e) {
+ throw new SQLException("Could not put root to the queue", e);
}
}
+ private void executeNextRoot() throws SQLException {
+ try {
+ ofNullable(currentRoot).ifPresent(AutoCloseables::closeNoChecked);
+ currentRoot = vectorSchemaRoots.poll(10, TimeUnit.SECONDS);
+ execute(currentRoot, schema);
+ } catch (InterruptedException e) {
+ throw new SQLException("Could not take root from the queue", e);
+ }
+ }
+
+ private void storeRootsFromStreamAsync() {
+ CompletableFuture.runAsync(() -> {
Review Comment:
This uses a global thread pool, can we instead use a pool we control?
##########
java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java:
##########
@@ -130,39 +146,89 @@ protected AvaticaResultSet execute() throws SQLException {
}
private void execute(final FlightInfo flightInfo) throws SQLException {
- loadNewQueue();
+ // load new FlightStreamQueue
+ ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
+ flightStreamQueue = createNewQueue(connection.getExecutorService());
+
+ // load new FlightStream
flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
- loadNewFlightStream();
+ ofNullable(currentFlightStream).ifPresent(AutoCloseables::closeNoChecked);
+ currentFlightStream = getNextFlightStream(true);
// Ownership of the root will be passed onto the cursor.
if (currentFlightStream != null) {
- executeForCurrentFlightStream();
+ storeRootsFromStreamAsync();
+ executeNextRoot();
+ }
+ }
+
+ private BufferAllocator getAllocator() {
+ if (allocator == null) {
+ allocator =
connection.getBufferAllocator().newChildAllocator("vsr-copier", 0,
Long.MAX_VALUE);
}
+
+ return allocator;
}
- private void executeForCurrentFlightStream() throws SQLException {
- final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
+ private VectorSchemaRoot cloneRoot(VectorSchemaRoot originalRoot) {
+ VectorSchemaRoot theRoot =
VectorSchemaRoot.create(originalRoot.getSchema(), getAllocator());
+ VectorLoader loader = new VectorLoader(theRoot);
+ VectorUnloader unloader = new VectorUnloader(originalRoot);
+ try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
+ loader.load(recordBatch);
+ }
+ return theRoot;
+ }
+ private void storeRoot(VectorSchemaRoot originalRoot) throws SQLException {
+ VectorSchemaRoot theRoot = cloneRoot(originalRoot);
+ VectorSchemaRoot transformedRoot = null;
if (transformer != null) {
try {
- currentVectorSchemaRoot = transformer.transform(originalRoot,
currentVectorSchemaRoot);
+ transformedRoot = transformer.transform(theRoot, null);
+ theRoot.close();
Review Comment:
Can we use try-with-resources here over explicit close?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]