lidavidm commented on code in PR #12830: URL: https://github.com/apache/arrow/pull/12830#discussion_r846161414
########## java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcArray.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.driver.jdbc; + +import java.sql.Array; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.Map; + +import org.apache.arrow.driver.jdbc.accessor.impl.complex.AbstractArrowFlightJdbcListVectorAccessor; +import org.apache.arrow.driver.jdbc.utils.SqlTypes; +import org.apache.arrow.memory.util.LargeMemoryUtil; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.util.TransferPair; + +/** + * Implementation of {@link Array} using an underlying {@link FieldVector}. + * + * @see AbstractArrowFlightJdbcListVectorAccessor + */ +public class ArrowFlightJdbcArray implements Array { + + private final FieldVector dataVector; + private final long startOffset; + private final long valuesCount; + + /** + * Instantiate an {@link Array} backed up by given {@link FieldVector}, limited by a start offset and values count. + * + * @param dataVector underlying FieldVector, containing the Array items. + * @param startOffset offset from FieldVector pointing to this Array's first value. + * @param valuesCount how many items this Array contains. + */ + public ArrowFlightJdbcArray(FieldVector dataVector, long startOffset, long valuesCount) { + this.dataVector = dataVector; + this.startOffset = startOffset; + this.valuesCount = valuesCount; + } + + @Override + public String getBaseTypeName() { + final ArrowType arrowType = this.dataVector.getField().getType(); + return SqlTypes.getSqlTypeNameFromArrowType(arrowType); + } + + @Override + public int getBaseType() { + final ArrowType arrowType = this.dataVector.getField().getType(); + return SqlTypes.getSqlTypeIdFromArrowType(arrowType); + } + + @Override + public Object getArray() throws SQLException { + return getArray(null); + } + + @Override + public Object getArray(Map<String, Class<?>> map) throws SQLException { + if (map != null) { + throw new SQLFeatureNotSupportedException(); + } + + return getArrayNoBoundCheck(this.dataVector, this.startOffset, this.valuesCount); + } + + @Override + public Object getArray(long index, int count) throws SQLException { + return getArray(index, count, null); + } + + private void checkBoundaries(long index, int count) { + if (index < 0 || index + count > this.startOffset + this.valuesCount) { + throw new ArrayIndexOutOfBoundsException(); + } + } + + private static Object getArrayNoBoundCheck(ValueVector dataVector, long start, long count) { + Object[] result = new Object[LargeMemoryUtil.checkedCastToInt(count)]; + for (int i = 0; i < count; i++) { + result[i] = dataVector.getObject(LargeMemoryUtil.checkedCastToInt(start + i)); + } + + return result; + } + + @Override + public Object getArray(long index, int count, Map<String, Class<?>> map) throws SQLException { + if (map != null) { + throw new SQLFeatureNotSupportedException(); + } + + checkBoundaries(index, count); + return getArrayNoBoundCheck(this.dataVector, + LargeMemoryUtil.checkedCastToInt(this.startOffset + index), count); + } + + @Override + public ResultSet getResultSet() throws SQLException { + return this.getResultSet(null); + } + + @Override + public ResultSet getResultSet(Map<String, Class<?>> map) throws SQLException { + if (map != null) { + throw new SQLFeatureNotSupportedException(); + } + + return getResultSetNoBoundariesCheck(this.dataVector, this.startOffset, this.valuesCount); + } + + @Override + public ResultSet getResultSet(long index, int count) throws SQLException { + return getResultSet(index, count, null); + } + + private static ResultSet getResultSetNoBoundariesCheck(ValueVector dataVector, long start, + long count) + throws SQLException { + TransferPair transferPair = dataVector.getTransferPair(dataVector.getAllocator()); + transferPair.splitAndTransfer(LargeMemoryUtil.checkedCastToInt(start), + LargeMemoryUtil.checkedCastToInt(count)); + FieldVector vectorSlice = (FieldVector) transferPair.getTo(); + + VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.of(vectorSlice); + return ArrowFlightJdbcVectorSchemaRootResultSet.fromVectorSchemaRoot(vectorSchemaRoot); + } + + @Override + public ResultSet getResultSet(long index, int count, Map<String, Class<?>> map) + throws SQLException { + if (map != null) { + throw new SQLFeatureNotSupportedException(); + } + + checkBoundaries(index, count); + return getResultSetNoBoundariesCheck(this.dataVector, + LargeMemoryUtil.checkedCastToInt(this.startOffset + index), count); + } + + @Override + public void free() { + Review Comment: Might it be reasonable to increment/decrement the underlying vector's reference count here? Though I suppose I see this is meant to be scoped to a transaction and presumably shouldn't be kept alive past the result set itself. ########## java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/RootAllocatorTestRule.java: ########## @@ -0,0 +1,820 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.driver.jdbc.utils; + +import java.math.BigDecimal; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DateMilliVector; +import org.apache.arrow.vector.Decimal256Vector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.LargeVarBinaryVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeSecVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TimeStampSecTZVector; +import org.apache.arrow.vector.TimeStampSecVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.UInt1Vector; +import org.apache.arrow.vector.UInt2Vector; +import org.apache.arrow.vector.UInt4Vector; +import org.apache.arrow.vector.UInt8Vector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.complex.FixedSizeListVector; +import org.apache.arrow.vector.complex.LargeListVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.impl.UnionFixedSizeListWriter; +import org.apache.arrow.vector.complex.impl.UnionLargeListWriter; +import org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +public class RootAllocatorTestRule implements TestRule, AutoCloseable { Review Comment: nit, but since this exposes BufferAllocator not RootAllocator, maybe this could just be AllocatorTestRule/getAllocator? ########## java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/accessor/impl/calendar/ArrowFlightJdbcTimeStampVectorAccessor.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.driver.jdbc.accessor.impl.calendar; + +import static org.apache.arrow.driver.jdbc.accessor.impl.calendar.ArrowFlightJdbcTimeStampVectorGetter.Getter; +import static org.apache.arrow.driver.jdbc.accessor.impl.calendar.ArrowFlightJdbcTimeStampVectorGetter.Holder; +import static org.apache.arrow.driver.jdbc.accessor.impl.calendar.ArrowFlightJdbcTimeStampVectorGetter.createGetter; + +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Calendar; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; +import java.util.function.IntSupplier; + +import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessor; +import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessorFactory; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.util.DateUtility; + +/** + * Accessor for the Arrow types extending from {@link TimeStampVector}. + */ +public class ArrowFlightJdbcTimeStampVectorAccessor extends ArrowFlightJdbcAccessor { + + private final TimeZone timeZone; + private final Getter getter; + private final TimeUnit timeUnit; + private final LongToLocalDateTime longToLocalDateTime; + private final Holder holder; + + /** + * Functional interface used to convert a number (in any time resolution) to LocalDateTime. + */ + interface LongToLocalDateTime { + LocalDateTime fromLong(long value); + } + + /** + * Instantiate a ArrowFlightJdbcTimeStampVectorAccessor for given vector. + */ + public ArrowFlightJdbcTimeStampVectorAccessor(TimeStampVector vector, + IntSupplier currentRowSupplier, + ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) { + super(currentRowSupplier, setCursorWasNull); + this.holder = new Holder(); + this.getter = createGetter(vector); + + this.timeZone = getTimeZoneForVector(vector); + this.timeUnit = getTimeUnitForVector(vector); + this.longToLocalDateTime = getLongToLocalDateTimeForVector(vector, this.timeZone); + } + + @Override + public Class<?> getObjectClass() { + return Timestamp.class; + } + + @Override + public Object getObject() { + return this.getTimestamp(null); + } + + private LocalDateTime getLocalDateTime(Calendar calendar) { + getter.get(getCurrentRow(), holder); + this.wasNull = holder.isSet == 0; + this.wasNullConsumer.setWasNull(this.wasNull); + if (this.wasNull) { + return null; + } + + long value = holder.value; + + LocalDateTime localDateTime = this.longToLocalDateTime.fromLong(value); + + if (calendar != null) { + TimeZone timeZone = calendar.getTimeZone(); + long millis = this.timeUnit.toMillis(value); + localDateTime = localDateTime + .minus(timeZone.getOffset(millis) - this.timeZone.getOffset(millis), ChronoUnit.MILLIS); + } + return localDateTime; + } + + @Override + public Date getDate(Calendar calendar) { + LocalDateTime localDateTime = getLocalDateTime(calendar); + if (localDateTime == null) { + return null; + } + + return new Date(Timestamp.valueOf(localDateTime).getTime()); + } + + @Override + public Time getTime(Calendar calendar) { + LocalDateTime localDateTime = getLocalDateTime(calendar); + if (localDateTime == null) { + return null; + } + + return new Time(Timestamp.valueOf(localDateTime).getTime()); + } + + @Override + public Timestamp getTimestamp(Calendar calendar) { + LocalDateTime localDateTime = getLocalDateTime(calendar); + if (localDateTime == null) { + return null; + } + + return Timestamp.valueOf(localDateTime); + } + + protected static TimeUnit getTimeUnitForVector(TimeStampVector vector) { + ArrowType.Timestamp arrowType = + (ArrowType.Timestamp) vector.getField().getFieldType().getType(); + + switch (arrowType.getUnit()) { + case NANOSECOND: + return TimeUnit.NANOSECONDS; + case MICROSECOND: + return TimeUnit.MICROSECONDS; + case MILLISECOND: + return TimeUnit.MILLISECONDS; + case SECOND: + return TimeUnit.SECONDS; + default: + throw new UnsupportedOperationException("Invalid Arrow time unit"); + } Review Comment: nit, but this would probably be nice as a helper method on the arrow time unit itself ########## java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.driver.jdbc; + +import java.sql.SQLException; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler; +import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.util.Preconditions; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaFactory; + +import io.netty.util.concurrent.DefaultThreadFactory; + +/** + * Connection to the Arrow Flight server. + */ +public final class ArrowFlightConnection extends AvaticaConnection { + + private final BufferAllocator allocator; + private final ArrowFlightSqlClientHandler clientHandler; + private final ArrowFlightConnectionConfigImpl config; + private ExecutorService executorService; + + /** + * Creates a new {@link ArrowFlightConnection}. + * + * @param driver the {@link ArrowFlightJdbcDriver} to use. + * @param factory the {@link AvaticaFactory} to use. + * @param url the URL to use. + * @param properties the {@link Properties} to use. + * @param config the {@link ArrowFlightConnectionConfigImpl} to use. + * @param allocator the {@link BufferAllocator} to use. + * @param clientHandler the {@link ArrowFlightSqlClientHandler} to use. + */ + private ArrowFlightConnection(final ArrowFlightJdbcDriver driver, final AvaticaFactory factory, + final String url, final Properties properties, + final ArrowFlightConnectionConfigImpl config, + final BufferAllocator allocator, + final ArrowFlightSqlClientHandler clientHandler) { + super(driver, factory, url, properties); + this.config = Preconditions.checkNotNull(config, "Config cannot be null."); + this.allocator = Preconditions.checkNotNull(allocator, "Allocator cannot be null."); + this.clientHandler = Preconditions.checkNotNull(clientHandler, "Handler cannot be null."); + } + + /** + * Creates a new {@link ArrowFlightConnection} to a {@link FlightClient}. + * + * @param driver the {@link ArrowFlightJdbcDriver} to use. + * @param factory the {@link AvaticaFactory} to use. + * @param url the URL to establish the connection to. + * @param properties the {@link Properties} to use for this session. + * @param allocator the {@link BufferAllocator} to use. + * @return a new {@link ArrowFlightConnection}. + * @throws SQLException on error. + */ + static ArrowFlightConnection createNewConnection(final ArrowFlightJdbcDriver driver, + final AvaticaFactory factory, + final String url, final Properties properties, + final BufferAllocator allocator) + throws SQLException { + final ArrowFlightConnectionConfigImpl config = new ArrowFlightConnectionConfigImpl(properties); + final ArrowFlightSqlClientHandler clientHandler = createNewClientHandler(config, allocator); + return new ArrowFlightConnection(driver, factory, url, properties, config, allocator, + clientHandler); + } + + private static ArrowFlightSqlClientHandler createNewClientHandler( + final ArrowFlightConnectionConfigImpl config, + final BufferAllocator allocator) throws SQLException { + try { + return new ArrowFlightSqlClientHandler.Builder() + .withHost(config.getHost()) + .withPort(config.getPort()) + .withUsername(config.getUser()) + .withPassword(config.getPassword()) + .withKeyStorePath(config.getKeyStorePath()) + .withKeyStorePassword(config.keystorePassword()) + .withBufferAllocator(allocator) + .withTlsEncryption(config.useTls()) + .withDisableCertificateVerification(config.getDisableCertificateVerification()) + .withToken(config.getToken()) + .withCallOptions(config.toCallOption()) + .build(); + } catch (final SQLException e) { + try { + allocator.close(); + } catch (final Exception allocatorCloseEx) { + e.addSuppressed(allocatorCloseEx); + } + throw e; + } + } + + void reset() throws SQLException { + // Clean up any open Statements + try { + AutoCloseables.close(statementMap.values()); + } catch (final Exception e) { + throw AvaticaConnection.HELPER.createException(e.getMessage(), e); + } + + statementMap.clear(); + + // Reset Holdability + this.setHoldability(this.metaData.getResultSetHoldability()); + + // Reset Meta + ((ArrowFlightMetaImpl) this.meta).setDefaultConnectionProperties(); + } + + /** + * Gets the client {@link #clientHandler} backing this connection. + * + * @return the handler. + */ + ArrowFlightSqlClientHandler getClientHandler() throws SQLException { + return clientHandler; + } + + /** + * Gets the {@link ExecutorService} of this connection. + * + * @return the {@link #executorService}. + */ + synchronized ExecutorService getExecutorService() { + return executorService = executorService == null ? + Executors.newFixedThreadPool(config.threadPoolSize(), + new DefaultThreadFactory(getClass().getSimpleName())) : + executorService; + } + + @Override + public Properties getClientInfo() { + final Properties copy = new Properties(); + copy.putAll(info); + return copy; + } + + @Override + public void close() throws SQLException { + if (executorService != null) { + executorService.shutdown(); + } + + try { + AutoCloseables.close(clientHandler); + allocator.getChildAllocators().forEach(AutoCloseables::closeNoChecked); + AutoCloseables.close(allocator); + + super.close(); Review Comment: It might be good to do this all in one call to AutoCloseables to make sure things get closed even if one of the close calls fails. Also, I'm curious why we have to explicitly clean up child allocators here ########## java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriver.java: ########## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.driver.jdbc; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.DriverVersion; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.UnregisteredDriver; +import org.apache.calcite.avatica.org.apache.http.NameValuePair; +import org.apache.calcite.avatica.org.apache.http.client.utils.URLEncodedUtils; + +/** + * JDBC driver for querying data from an Apache Arrow Flight server. + */ +public class ArrowFlightJdbcDriver extends UnregisteredDriver { + + private static final String CONNECT_STRING_PREFIX = "jdbc:arrow-flight://"; + private static final String CONNECTION_STRING_EXPECTED = "jdbc:arrow-flight://[host][:port][?param1=value&...]"; + private static DriverVersion version; + + static { + // Special code for supporting Java9 and higher. + // Netty requires some extra properties to unlock some native memory management api + // Setting this property if not already set externally + // This has to be done before any netty class is being loaded + final String key = "cfjd.io.netty.tryReflectionSetAccessible"; + final String tryReflectionSetAccessible = System.getProperty(key); + if (tryReflectionSetAccessible == null) { + System.setProperty(key, Boolean.TRUE.toString()); + } + + new ArrowFlightJdbcDriver().register(); + } + + @Override + public ArrowFlightConnection connect(final String url, final Properties info) + throws SQLException { + final Properties properties = new Properties(info); + properties.putAll(info); Review Comment: Isn't the second call redundant? ########## java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcCursor.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.driver.jdbc; + + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessorFactory; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.calcite.avatica.ColumnMetaData; +import org.apache.calcite.avatica.util.AbstractCursor; +import org.apache.calcite.avatica.util.ArrayImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Arrow Flight Jdbc's Cursor class. + */ +public class ArrowFlightJdbcCursor extends AbstractCursor { + + private static final Logger LOGGER; + private final VectorSchemaRoot root; + private final int rowCount; + private int currentRow = -1; + + static { + LOGGER = LoggerFactory.getLogger(ArrowFlightJdbcCursor.class); Review Comment: nit: why is this not initialized inline above? ########## java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/RootAllocatorTestRule.java: ########## @@ -0,0 +1,820 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.driver.jdbc.utils; + +import java.math.BigDecimal; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DateMilliVector; +import org.apache.arrow.vector.Decimal256Vector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.LargeVarBinaryVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeSecVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TimeStampSecTZVector; +import org.apache.arrow.vector.TimeStampSecVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.UInt1Vector; +import org.apache.arrow.vector.UInt2Vector; +import org.apache.arrow.vector.UInt4Vector; +import org.apache.arrow.vector.UInt8Vector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.complex.FixedSizeListVector; +import org.apache.arrow.vector.complex.LargeListVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.impl.UnionFixedSizeListWriter; +import org.apache.arrow.vector.complex.impl.UnionLargeListWriter; +import org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +public class RootAllocatorTestRule implements TestRule, AutoCloseable { + + public static final byte MAX_VALUE = Byte.MAX_VALUE; + private final BufferAllocator rootAllocator = new RootAllocator(); + + private final Random random = new Random(10); + + @Override + public Statement apply(Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + try { + base.evaluate(); + } finally { + close(); + } + } + }; + } + + public BufferAllocator getRootAllocator() { + return rootAllocator; + } + + @Override + public void close() throws Exception { + this.rootAllocator.getChildAllocators().forEach(BufferAllocator::close); Review Comment: Hmm, generally code that creates a child allocator should be responsible for cleaning it up itself ########## java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/utils/ClientAuthenticationUtils.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.driver.jdbc.client.utils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.List; + +import org.apache.arrow.flight.CallOption; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.auth2.BasicAuthCredentialWriter; +import org.apache.arrow.flight.auth2.ClientIncomingAuthHeaderMiddleware; +import org.apache.arrow.flight.grpc.CredentialCallOption; +import org.apache.arrow.util.Preconditions; +import org.bouncycastle.openssl.jcajce.JcaPEMWriter; + +/** + * Utils for {@link FlightClientHandler} authentication. + */ +public final class ClientAuthenticationUtils { + + private ClientAuthenticationUtils() { + // Prevent instantiation. + } + + /** + * Gets the {@link CredentialCallOption} for the provided authentication info. + * + * @param client the client. + * @param credential the credential as CallOptions. + * @param options the {@link CallOption}s to use. + * @return the credential call option. + */ + public static CredentialCallOption getAuthenticate(final FlightClient client, + final CredentialCallOption credential, + final CallOption... options) { + + final List<CallOption> theseOptions = new ArrayList<>(); + theseOptions.add(credential); + theseOptions.addAll(Arrays.asList(options)); + client.handshake(theseOptions.toArray(new CallOption[0])); + + return (CredentialCallOption) theseOptions.get(0); Review Comment: Why not return `credential` directly? ########## java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriver.java: ########## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.driver.jdbc; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.DriverVersion; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.UnregisteredDriver; +import org.apache.calcite.avatica.org.apache.http.NameValuePair; +import org.apache.calcite.avatica.org.apache.http.client.utils.URLEncodedUtils; + +/** + * JDBC driver for querying data from an Apache Arrow Flight server. + */ +public class ArrowFlightJdbcDriver extends UnregisteredDriver { + + private static final String CONNECT_STRING_PREFIX = "jdbc:arrow-flight://"; + private static final String CONNECTION_STRING_EXPECTED = "jdbc:arrow-flight://[host][:port][?param1=value&...]"; + private static DriverVersion version; + + static { + // Special code for supporting Java9 and higher. + // Netty requires some extra properties to unlock some native memory management api + // Setting this property if not already set externally + // This has to be done before any netty class is being loaded + final String key = "cfjd.io.netty.tryReflectionSetAccessible"; + final String tryReflectionSetAccessible = System.getProperty(key); + if (tryReflectionSetAccessible == null) { + System.setProperty(key, Boolean.TRUE.toString()); + } Review Comment: This may surprise people; I think we discussed this before though and we don't want to force people to set this property everywhere? (Which is understandable…) It should be called out in documentation, though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
