lidavidm commented on code in PR #13800:
URL: https://github.com/apache/arrow/pull/13800#discussion_r938836014


##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/accessor/impl/text/ArrowFlightJdbcVarCharVectorAccessor.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.accessor.impl.text;
+
+import static java.nio.charset.StandardCharsets.US_ASCII;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.ByteArrayInputStream;
+import java.io.CharArrayReader;
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.function.IntSupplier;
+
+import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessor;
+import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessorFactory;
+import org.apache.arrow.driver.jdbc.utils.DateTimeUtils;
+import org.apache.arrow.vector.LargeVarCharVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.util.Text;
+
+/**
+ * Accessor for the Arrow types: {@link VarCharVector} and {@link 
LargeVarCharVector}.
+ */
+public class ArrowFlightJdbcVarCharVectorAccessor extends 
ArrowFlightJdbcAccessor {
+
+  /**
+   * Functional interface to help integrating VarCharVector and 
LargeVarCharVector.
+   */
+  @FunctionalInterface
+  interface Getter {
+    byte[] get(int index);
+  }
+
+  private final Getter getter;
+
+  public ArrowFlightJdbcVarCharVectorAccessor(VarCharVector vector,
+                                              IntSupplier currentRowSupplier,
+                                              
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    this(vector::get, currentRowSupplier, setCursorWasNull);
+  }
+
+  public ArrowFlightJdbcVarCharVectorAccessor(LargeVarCharVector vector,
+                                              IntSupplier currentRowSupplier,
+                                              
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    this(vector::get, currentRowSupplier, setCursorWasNull);
+  }
+
+  ArrowFlightJdbcVarCharVectorAccessor(Getter getter,
+                                       IntSupplier currentRowSupplier,
+                                       
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    super(currentRowSupplier, setCursorWasNull);
+    this.getter = getter;
+  }
+
+  @Override
+  public Class<?> getObjectClass() {
+    return String.class;
+  }
+
+  @Override
+  public String getObject() {
+    final byte[] bytes = getBytes();
+    return bytes == null ? null : new String(bytes, UTF_8);
+  }
+
+  @Override
+  public String getString() {
+    return getObject();
+  }
+
+  @Override
+  public byte[] getBytes() {
+    final byte[] bytes = this.getter.get(getCurrentRow());
+    this.wasNull = bytes == null;
+    this.wasNullConsumer.setWasNull(this.wasNull);
+    return bytes;
+  }
+
+  @Override
+  public boolean getBoolean() throws SQLException {
+    String value = getString();
+    if (value == null || value.equalsIgnoreCase("false") || value.equals("0")) 
{
+      return false;
+    } else if (value.equalsIgnoreCase("true") || value.equals("1")) {
+      return true;
+    } else {
+      throw new SQLException("It is not possible to convert this value to 
boolean: " + value);
+    }
+  }
+
+  @Override
+  public byte getByte() throws SQLException {
+    try {
+      return Byte.parseByte(this.getString());
+    } catch (Exception e) {
+      throw new SQLException(e);
+    }
+  }
+
+  @Override
+  public short getShort() throws SQLException {
+    try {
+      return Short.parseShort(this.getString());
+    } catch (Exception e) {
+      throw new SQLException(e);
+    }
+  }
+
+  @Override
+  public int getInt() throws SQLException {
+    try {
+      return Integer.parseInt(this.getString());
+    } catch (Exception e) {
+      throw new SQLException(e);
+    }
+  }
+
+  @Override
+  public long getLong() throws SQLException {
+    try {
+      return Long.parseLong(this.getString());
+    } catch (Exception e) {
+      throw new SQLException(e);
+    }
+  }
+
+  @Override
+  public float getFloat() throws SQLException {
+    try {
+      return Float.parseFloat(this.getString());
+    } catch (Exception e) {
+      throw new SQLException(e);
+    }
+  }
+
+  @Override
+  public double getDouble() throws SQLException {
+    try {
+      return Double.parseDouble(this.getString());
+    } catch (Exception e) {
+      throw new SQLException(e);
+    }
+  }
+
+  @Override
+  public BigDecimal getBigDecimal() throws SQLException {
+    try {
+      return new BigDecimal(this.getString());
+    } catch (Exception e) {
+      throw new SQLException(e);
+    }
+  }
+
+  @Override
+  public BigDecimal getBigDecimal(int i) throws SQLException {
+    try {
+      return BigDecimal.valueOf(this.getLong(), i);
+    } catch (Exception e) {
+      throw new SQLException(e);
+    }
+  }
+
+  @Override
+  public InputStream getAsciiStream() {
+    final String textValue = getString();
+    if (textValue == null) {
+      return null;
+    }
+    // Already in UTF-8
+    return new ByteArrayInputStream(textValue.getBytes(US_ASCII));

Review Comment:
   Why US_ASCII here instead of UTF_8?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/accessor/impl/numeric/ArrowFlightJdbcBaseIntVectorAccessor.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.accessor.impl.numeric;
+
+import static 
org.apache.arrow.driver.jdbc.accessor.impl.numeric.ArrowFlightJdbcNumericGetter.Getter;
+import static 
org.apache.arrow.driver.jdbc.accessor.impl.numeric.ArrowFlightJdbcNumericGetter.createGetter;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.function.IntSupplier;
+
+import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessor;
+import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessorFactory;
+import 
org.apache.arrow.driver.jdbc.accessor.impl.numeric.ArrowFlightJdbcNumericGetter.NumericHolder;
+import org.apache.arrow.vector.BaseIntVector;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.UInt1Vector;
+import org.apache.arrow.vector.UInt2Vector;
+import org.apache.arrow.vector.UInt4Vector;
+import org.apache.arrow.vector.UInt8Vector;
+import org.apache.arrow.vector.types.Types.MinorType;
+
+/**
+ * Accessor for the arrow types: TinyIntVector, SmallIntVector, IntVector, 
BigIntVector,
+ * UInt1Vector, UInt2Vector, UInt4Vector and UInt8Vector.
+ */
+public class ArrowFlightJdbcBaseIntVectorAccessor extends 
ArrowFlightJdbcAccessor {
+
+  private final MinorType type;
+  private final boolean isUnsigned;
+  private final int bytesToAllocate;
+  private final Getter getter;
+  private final NumericHolder holder;
+
+  public ArrowFlightJdbcBaseIntVectorAccessor(UInt1Vector vector, IntSupplier 
currentRowSupplier,
+                                              
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    this(vector, currentRowSupplier, true, UInt1Vector.TYPE_WIDTH, 
setCursorWasNull);
+  }
+
+  public ArrowFlightJdbcBaseIntVectorAccessor(UInt2Vector vector, IntSupplier 
currentRowSupplier,
+                                              
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    this(vector, currentRowSupplier, true, UInt2Vector.TYPE_WIDTH, 
setCursorWasNull);
+  }
+
+  public ArrowFlightJdbcBaseIntVectorAccessor(UInt4Vector vector, IntSupplier 
currentRowSupplier,
+                                              
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    this(vector, currentRowSupplier, true, UInt4Vector.TYPE_WIDTH, 
setCursorWasNull);
+  }
+
+  public ArrowFlightJdbcBaseIntVectorAccessor(UInt8Vector vector, IntSupplier 
currentRowSupplier,
+                                              
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    this(vector, currentRowSupplier, true, UInt8Vector.TYPE_WIDTH, 
setCursorWasNull);
+  }
+
+  public ArrowFlightJdbcBaseIntVectorAccessor(TinyIntVector vector, 
IntSupplier currentRowSupplier,
+                                              
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    this(vector, currentRowSupplier, false, TinyIntVector.TYPE_WIDTH, 
setCursorWasNull);
+  }
+
+  public ArrowFlightJdbcBaseIntVectorAccessor(SmallIntVector vector, 
IntSupplier currentRowSupplier,
+                                              
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    this(vector, currentRowSupplier, false, SmallIntVector.TYPE_WIDTH, 
setCursorWasNull);
+  }
+
+  public ArrowFlightJdbcBaseIntVectorAccessor(IntVector vector, IntSupplier 
currentRowSupplier,
+                                              
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    this(vector, currentRowSupplier, false, IntVector.TYPE_WIDTH, 
setCursorWasNull);
+  }
+
+  public ArrowFlightJdbcBaseIntVectorAccessor(BigIntVector vector, IntSupplier 
currentRowSupplier,
+                                              
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    this(vector, currentRowSupplier, false, BigIntVector.TYPE_WIDTH, 
setCursorWasNull);
+  }
+
+  private ArrowFlightJdbcBaseIntVectorAccessor(BaseIntVector vector, 
IntSupplier currentRowSupplier,
+                                               boolean isUnsigned, int 
bytesToAllocate,
+                                               
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    super(currentRowSupplier, setCursorWasNull);
+    this.type = vector.getMinorType();
+    this.holder = new NumericHolder();
+    this.getter = createGetter(vector);
+    this.isUnsigned = isUnsigned;
+    this.bytesToAllocate = bytesToAllocate;
+  }
+
+  @Override
+  public long getLong() {
+    getter.get(getCurrentRow(), holder);
+
+    this.wasNull = holder.isSet == 0;
+    this.wasNullConsumer.setWasNull(this.wasNull);
+    if (this.wasNull) {
+      return 0;
+    }
+
+    return holder.value;
+  }
+
+  @Override
+  public Class<?> getObjectClass() {
+    return Long.class;

Review Comment:
   The actual value is typed Number below



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/accessor/impl/calendar/ArrowFlightJdbcTimeStampVectorAccessor.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.accessor.impl.calendar;
+
+import static 
org.apache.arrow.driver.jdbc.accessor.impl.calendar.ArrowFlightJdbcTimeStampVectorGetter.Getter;
+import static 
org.apache.arrow.driver.jdbc.accessor.impl.calendar.ArrowFlightJdbcTimeStampVectorGetter.Holder;
+import static 
org.apache.arrow.driver.jdbc.accessor.impl.calendar.ArrowFlightJdbcTimeStampVectorGetter.createGetter;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Calendar;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntSupplier;
+
+import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessor;
+import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessorFactory;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.util.DateUtility;
+
+/**
+ * Accessor for the Arrow types extending from {@link TimeStampVector}.
+ */
+public class ArrowFlightJdbcTimeStampVectorAccessor extends 
ArrowFlightJdbcAccessor {
+
+  private final TimeZone timeZone;
+  private final Getter getter;
+  private final TimeUnit timeUnit;
+  private final LongToLocalDateTime longToLocalDateTime;
+  private final Holder holder;
+
+  /**
+   * Functional interface used to convert a number (in any time resolution) to 
LocalDateTime.
+   */
+  interface LongToLocalDateTime {
+    LocalDateTime fromLong(long value);
+  }
+
+  /**
+   * Instantiate a ArrowFlightJdbcTimeStampVectorAccessor for given vector.
+   */
+  public ArrowFlightJdbcTimeStampVectorAccessor(TimeStampVector vector,
+                                                IntSupplier currentRowSupplier,
+                                                
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    super(currentRowSupplier, setCursorWasNull);
+    this.holder = new Holder();
+    this.getter = createGetter(vector);
+
+    this.timeZone = getTimeZoneForVector(vector);
+    this.timeUnit = getTimeUnitForVector(vector);
+    this.longToLocalDateTime = getLongToLocalDateTimeForVector(vector, 
this.timeZone);
+  }
+
+  @Override
+  public Class<?> getObjectClass() {
+    return Timestamp.class;
+  }
+
+  @Override
+  public Object getObject() {
+    return this.getTimestamp(null);
+  }
+
+  private LocalDateTime getLocalDateTime(Calendar calendar) {
+    getter.get(getCurrentRow(), holder);
+    this.wasNull = holder.isSet == 0;
+    this.wasNullConsumer.setWasNull(this.wasNull);
+    if (this.wasNull) {
+      return null;
+    }
+
+    long value = holder.value;
+
+    LocalDateTime localDateTime = this.longToLocalDateTime.fromLong(value);
+
+    if (calendar != null) {
+      TimeZone timeZone = calendar.getTimeZone();
+      long millis = this.timeUnit.toMillis(value);
+      localDateTime = localDateTime

Review Comment:
   This is to adjust the LocalDateTime (which is implicitly in the ArrowType's 
timezone?) to the requested timezone? Isn't there a more principled way to do 
this? (LocalDateTime -> ZonedDateTime -> convert -> back to LocalDateTime)?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static java.lang.String.format;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import 
org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
+import org.apache.arrow.util.Preconditions;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.QueryState;
+import org.apache.calcite.avatica.remote.TypedValue;
+
+/**
+ * Metadata handler for Arrow Flight.
+ */
+public class ArrowFlightMetaImpl extends MetaImpl {
+  private final Map<StatementHandle, PreparedStatement> 
statementHandlePreparedStatementMap;
+
+  /**
+   * Constructs a {@link MetaImpl} object specific for Arrow Flight.
+   * @param connection A {@link AvaticaConnection}.
+   */
+  public ArrowFlightMetaImpl(final AvaticaConnection connection) {
+    super(connection);
+    this.statementHandlePreparedStatementMap = new ConcurrentHashMap<>();
+    setDefaultConnectionProperties();
+  }
+
+  static Signature newSignature(final String sql) {
+    return new Signature(
+        new ArrayList<ColumnMetaData>(),
+        sql,
+        Collections.<AvaticaParameter>emptyList(),
+        Collections.<String, Object>emptyMap(),
+        null, // unnecessary, as SQL requests use ArrowFlightJdbcCursor
+        StatementType.SELECT
+    );
+  }
+
+  @Override
+  public void closeStatement(final StatementHandle statementHandle) {
+    PreparedStatement preparedStatement = 
statementHandlePreparedStatementMap.remove(statementHandle);
+    // Testing if the prepared statement was created because the statement can 
be not created until this moment
+    if (preparedStatement != null) {
+      preparedStatement.close();
+    }
+  }
+
+  @Override
+  public void commit(final ConnectionHandle connectionHandle) {
+    // TODO Fill this stub.
+  }
+
+  @Override
+  public ExecuteResult execute(final StatementHandle statementHandle,
+                               final List<TypedValue> typedValues, final long 
maxRowCount) {
+    // TODO Why is maxRowCount ignored?

Review Comment:
   File a JIRA so this can be `TODO(ARROW-YYYYY): ...`?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static java.util.Objects.isNull;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TimeZone;
+
+import org.apache.arrow.driver.jdbc.utils.ConvertUtils;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.calcite.avatica.AvaticaResultSet;
+import org.apache.calcite.avatica.AvaticaResultSetMetaData;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.Meta.Frame;
+import org.apache.calcite.avatica.Meta.Signature;
+import org.apache.calcite.avatica.QueryState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ResultSet} implementation used to access a {@link VectorSchemaRoot}.
+ */
+public class ArrowFlightJdbcVectorSchemaRootResultSet extends AvaticaResultSet 
{
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ArrowFlightJdbcVectorSchemaRootResultSet.class);
+  VectorSchemaRoot vectorSchemaRoot;
+
+  ArrowFlightJdbcVectorSchemaRootResultSet(final AvaticaStatement statement, 
final QueryState state,
+                                           final Signature signature,
+                                           final ResultSetMetaData 
resultSetMetaData,
+                                           final TimeZone timeZone, final 
Frame firstFrame)
+      throws SQLException {
+    super(statement, state, signature, resultSetMetaData, timeZone, 
firstFrame);
+  }
+
+  /**
+   * Instantiate a ResultSet backed up by given VectorSchemaRoot.
+   *
+   * @param vectorSchemaRoot root from which the ResultSet will access.
+   * @return a ResultSet which accesses the given VectorSchemaRoot
+   */
+  public static ArrowFlightJdbcVectorSchemaRootResultSet fromVectorSchemaRoot(
+      final VectorSchemaRoot vectorSchemaRoot)
+      throws SQLException {
+    // Similar to how org.apache.calcite.avatica.util.ArrayFactoryImpl does
+
+    final TimeZone timeZone = TimeZone.getDefault();
+    final QueryState state = new QueryState();
+
+    final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null);
+
+    final AvaticaResultSetMetaData resultSetMetaData =
+        new AvaticaResultSetMetaData(null, null, signature);
+    final ArrowFlightJdbcVectorSchemaRootResultSet
+        resultSet =
+        new ArrowFlightJdbcVectorSchemaRootResultSet(null, state, signature, 
resultSetMetaData,
+            timeZone, null);
+
+    resultSet.execute(vectorSchemaRoot);
+    return resultSet;
+  }
+
+  @Override
+  protected AvaticaResultSet execute() throws SQLException {
+    throw new RuntimeException("Can only execute with 
execute(VectorSchemaRoot)");
+  }
+
+  void execute(final VectorSchemaRoot vectorSchemaRoot) {
+    final List<Field> fields = vectorSchemaRoot.getSchema().getFields();
+    final List<ColumnMetaData> columns = 
ConvertUtils.convertArrowFieldsToColumnMetaDataList(fields);
+    signature.columns.clear();
+    signature.columns.addAll(columns);
+
+    this.vectorSchemaRoot = vectorSchemaRoot;
+    execute2(new ArrowFlightJdbcCursor(vectorSchemaRoot), 
this.signature.columns);
+  }
+
+  void execute(final VectorSchemaRoot vectorSchemaRoot, final Schema schema) {
+    final List<ColumnMetaData> columns = 
ConvertUtils.convertArrowFieldsToColumnMetaDataList(schema.getFields());
+    signature.columns.clear();
+    signature.columns.addAll(columns);
+
+    this.vectorSchemaRoot = vectorSchemaRoot;
+    execute2(new ArrowFlightJdbcCursor(vectorSchemaRoot), 
this.signature.columns);
+  }
+
+  @Override
+  protected void cancel() {
+    signature.columns.clear();
+    super.cancel();
+    try {
+      AutoCloseables.close(vectorSchemaRoot);
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+    final Set<Exception> exceptions = new HashSet<>();
+    try {
+      if (isClosed()) {
+        return;
+      }
+    } catch (final SQLException e) {
+      exceptions.add(e);
+    }
+    try {
+      AutoCloseables.close(vectorSchemaRoot);
+    } catch (final Exception e) {
+      exceptions.add(e);
+    }
+    if (!isNull(statement)) {
+      try {
+        super.close();
+      } catch (final Exception e) {
+        exceptions.add(e);
+      }
+    }
+    exceptions.parallelStream().forEach(e -> LOGGER.error(e.getMessage(), e));

Review Comment:
   Why parallelStream?



##########
java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/accessor/impl/numeric/ArrowFlightJdbcFloat4VectorAccessorTest.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.accessor.impl.numeric;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;

Review Comment:
   some other modules use assertj, FWIW (but we haven't yet standardized on a 
single assertion helper; we really should)



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java:
##########
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.client;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils;
+import org.apache.arrow.flight.CallOption;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightClientMiddleware;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.auth2.BearerCredentialWriter;
+import org.apache.arrow.flight.auth2.ClientBearerHeaderHandler;
+import org.apache.arrow.flight.auth2.ClientIncomingAuthHeaderMiddleware;
+import org.apache.arrow.flight.client.ClientCookieMiddleware;
+import org.apache.arrow.flight.grpc.CredentialCallOption;
+import org.apache.arrow.flight.sql.FlightSqlClient;
+import org.apache.arrow.flight.sql.impl.FlightSql.SqlInfo;
+import org.apache.arrow.flight.sql.util.TableRef;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.calcite.avatica.Meta.StatementType;
+
+/**
+ * A {@link FlightSqlClient} handler.
+ */
+public final class ArrowFlightSqlClientHandler implements AutoCloseable {
+
+  private final FlightSqlClient sqlClient;
+  private final Set<CallOption> options = new HashSet<>();
+
+  ArrowFlightSqlClientHandler(final FlightSqlClient sqlClient,
+                              final Collection<CallOption> options) {
+    this.options.addAll(options);
+    this.sqlClient = Preconditions.checkNotNull(sqlClient);
+  }
+
+  /**
+   * Creates a new {@link ArrowFlightSqlClientHandler} from the provided 
{@code client} and {@code options}.
+   *
+   * @param client  the {@link FlightClient} to manage under a {@link 
FlightSqlClient} wrapper.
+   * @param options the {@link CallOption}s to persist in between subsequent 
client calls.
+   * @return a new {@link ArrowFlightSqlClientHandler}.
+   */
+  public static ArrowFlightSqlClientHandler createNewHandler(final 
FlightClient client,
+                                                             final 
Collection<CallOption> options) {
+    return new ArrowFlightSqlClientHandler(new FlightSqlClient(client), 
options);
+  }
+
+  /**
+   * Gets the {@link #options} for the subsequent calls from this handler.
+   *
+   * @return the {@link CallOption}s.
+   */
+  private CallOption[] getOptions() {
+    return options.toArray(new CallOption[0]);
+  }
+
+  /**
+   * Makes an RPC "getStream" request based on the provided {@link FlightInfo}
+   * object. Retrieves the result of the query previously prepared with 
"getInfo."
+   *
+   * @param flightInfo The {@link FlightInfo} instance from which to fetch 
results.
+   * @return a {@code FlightStream} of results.
+   */
+  public List<FlightStream> getStreams(final FlightInfo flightInfo) {
+    return flightInfo.getEndpoints().stream()
+        .map(FlightEndpoint::getTicket)
+        .map(ticket -> sqlClient.getStream(ticket, getOptions()))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Makes an RPC "getInfo" request based on the provided {@code query}
+   * object.
+   *
+   * @param query The query.
+   * @return a {@code FlightStream} of results.
+   */
+  public FlightInfo getInfo(final String query) {
+    return sqlClient.execute(query, getOptions());
+  }
+
+  @Override
+  public void close() throws SQLException {
+    try {
+      AutoCloseables.close(sqlClient);
+    } catch (final Exception e) {
+      throw new SQLException("Failed to clean up client resources.", e);
+    }
+  }
+
+  /**
+   * A prepared statement handler.
+   */
+  public interface PreparedStatement extends AutoCloseable {
+    /**
+     * Executes this {@link PreparedStatement}.
+     *
+     * @return the {@link FlightInfo} representing the outcome of this query 
execution.
+     * @throws SQLException on error.
+     */
+    FlightInfo executeQuery() throws SQLException;
+
+    /**
+     * Executes a {@link StatementType#UPDATE} query.
+     *
+     * @return the number of rows affected.
+     */
+    long executeUpdate();
+
+    /**
+     * Gets the {@link StatementType} of this {@link PreparedStatement}.
+     *
+     * @return the Statement Type.
+     */
+    StatementType getType();
+
+    /**
+     * Gets the {@link Schema} of this {@link PreparedStatement}.
+     *
+     * @return {@link Schema}.
+     */
+    Schema getDataSetSchema();
+
+    @Override
+    void close();
+  }
+
+  /**
+   * Creates a new {@link PreparedStatement} for the given {@code query}.
+   *
+   * @param query the SQL query.
+   * @return a new prepared statement.
+   */
+  public PreparedStatement prepare(final String query) {
+    final FlightSqlClient.PreparedStatement preparedStatement =
+        sqlClient.prepare(query, getOptions());
+    return new PreparedStatement() {
+      @Override
+      public FlightInfo executeQuery() throws SQLException {
+        return preparedStatement.execute(getOptions());
+      }
+
+      @Override
+      public long executeUpdate() {
+        return preparedStatement.executeUpdate(getOptions());
+      }
+
+      @Override
+      public StatementType getType() {
+        final Schema schema = preparedStatement.getResultSetSchema();
+        return schema.getFields().isEmpty() ? StatementType.UPDATE : 
StatementType.SELECT;
+      }
+
+      @Override
+      public Schema getDataSetSchema() {
+        return preparedStatement.getResultSetSchema();
+      }
+
+      @Override
+      public void close() {
+        preparedStatement.close(getOptions());
+      }
+    };
+  }
+
+  /**
+   * Makes an RPC "getCatalogs" request.
+   *
+   * @return a {@code FlightStream} of results.
+   */
+  public FlightInfo getCatalogs() {
+    return sqlClient.getCatalogs(getOptions());
+  }
+
+  /**
+   * Makes an RPC "getImportedKeys" request based on the provided info.
+   *
+   * @param catalog The catalog name. Must match the catalog name as it is 
stored in the database.
+   *                Retrieves those without a catalog. Null means that the 
catalog name should not be used to
+   *                narrow the search.
+   * @param schema  The schema name. Must match the schema name as it is 
stored in the database.
+   *                "" retrieves those without a schema. Null means that the 
schema name should not be used to narrow
+   *                the search.
+   * @param table   The table name. Must match the table name as it is stored 
in the database.
+   * @return a {@code FlightStream} of results.
+   */
+  public FlightInfo getImportedKeys(final String catalog, final String schema, 
final String table) {
+    return sqlClient.getImportedKeys(TableRef.of(catalog, schema, table), 
getOptions());
+  }
+
+  /**
+   * Makes an RPC "getExportedKeys" request based on the provided info.
+   *
+   * @param catalog The catalog name. Must match the catalog name as it is 
stored in the database.
+   *                Retrieves those without a catalog. Null means that the 
catalog name should not be used to
+   *                narrow the search.
+   * @param schema  The schema name. Must match the schema name as it is 
stored in the database.
+   *                "" retrieves those without a schema. Null means that the 
schema name should not be used to narrow
+   *                the search.
+   * @param table   The table name. Must match the table name as it is stored 
in the database.
+   * @return a {@code FlightStream} of results.
+   */
+  public FlightInfo getExportedKeys(final String catalog, final String schema, 
final String table) {
+    return sqlClient.getExportedKeys(TableRef.of(catalog, schema, table), 
getOptions());
+  }
+
+  /**
+   * Makes an RPC "getSchemas" request based on the provided info.
+   *
+   * @param catalog       The catalog name. Must match the catalog name as it 
is stored in the database.
+   *                      Retrieves those without a catalog. Null means that 
the catalog name should not be used to
+   *                      narrow the search.
+   * @param schemaPattern The schema name pattern. Must match the schema name 
as it is stored in the database.
+   *                      Null means that schema name should not be used to 
narrow down the search.
+   * @return a {@code FlightStream} of results.
+   */
+  public FlightInfo getSchemas(final String catalog, final String 
schemaPattern) {
+    return sqlClient.getSchemas(catalog, schemaPattern, getOptions());
+  }
+
+  /**
+   * Makes an RPC "getTableTypes" request.
+   *
+   * @return a {@code FlightStream} of results.
+   */
+  public FlightInfo getTableTypes() {
+    return sqlClient.getTableTypes(getOptions());
+  }
+
+  /**
+   * Makes an RPC "getTables" request based on the provided info.
+   *
+   * @param catalog          The catalog name. Must match the catalog name as 
it is stored in the database.
+   *                         Retrieves those without a catalog. Null means 
that the catalog name should not be used to
+   *                         narrow the search.
+   * @param schemaPattern    The schema name pattern. Must match the schema 
name as it is stored in the database.
+   *                         "" retrieves those without a schema. Null means 
that the schema name should not be used to
+   *                         narrow the search.
+   * @param tableNamePattern The table name pattern. Must match the table name 
as it is stored in the database.
+   * @param types            The list of table types, which must be from the 
list of table types to include.
+   *                         Null returns all types.
+   * @param includeSchema    Whether to include schema.
+   * @return a {@code FlightStream} of results.
+   */
+  public FlightInfo getTables(final String catalog, final String schemaPattern,
+                              final String tableNamePattern,
+                              final List<String> types, final boolean 
includeSchema) {
+
+    return sqlClient.getTables(catalog, schemaPattern, tableNamePattern, 
types, includeSchema,
+        getOptions());
+  }
+
+  /**
+   * Gets SQL info.
+   *
+   * @return the SQL info.
+   */
+  public FlightInfo getSqlInfo(SqlInfo... info) {
+    return sqlClient.getSqlInfo(info, getOptions());
+  }
+
+  /**
+   * Makes an RPC "getPrimaryKeys" request based on the provided info.
+   *
+   * @param catalog The catalog name; must match the catalog name as it is 
stored in the database.
+   *                "" retrieves those without a catalog.
+   *                Null means that the catalog name should not be used to 
narrow the search.
+   * @param schema  The schema name; must match the schema name as it is 
stored in the database.
+   *                "" retrieves those without a schema. Null means that the 
schema name should not be used to narrow
+   *                the search.
+   * @param table   The table name. Must match the table name as it is stored 
in the database.
+   * @return a {@code FlightStream} of results.
+   */
+  public FlightInfo getPrimaryKeys(final String catalog, final String schema, 
final String table) {
+    return sqlClient.getPrimaryKeys(TableRef.of(catalog, schema, table), 
getOptions());
+  }
+
+  /**
+   * Makes an RPC "getCrossReference" request based on the provided info.
+   *
+   * @param pkCatalog The catalog name. Must match the catalog name as it is 
stored in the database.
+   *                  Retrieves those without a catalog. Null means that the 
catalog name should not be used to
+   *                  narrow the search.
+   * @param pkSchema  The schema name. Must match the schema name as it is 
stored in the database.
+   *                  "" retrieves those without a schema. Null means that the 
schema name should not be used to narrow
+   *                  the search.
+   * @param pkTable   The table name. Must match the table name as it is 
stored in the database.
+   * @param fkCatalog The catalog name. Must match the catalog name as it is 
stored in the database.
+   *                  Retrieves those without a catalog. Null means that the 
catalog name should not be used to
+   *                  narrow the search.
+   * @param fkSchema  The schema name. Must match the schema name as it is 
stored in the database.
+   *                  "" retrieves those without a schema. Null means that the 
schema name should not be used to narrow
+   *                  the search.
+   * @param fkTable   The table name. Must match the table name as it is 
stored in the database.
+   * @return a {@code FlightStream} of results.
+   */
+  public FlightInfo getCrossReference(String pkCatalog, String pkSchema, 
String pkTable,
+                                      String fkCatalog, String fkSchema, 
String fkTable) {
+    return sqlClient.getCrossReference(TableRef.of(pkCatalog, pkSchema, 
pkTable),
+        TableRef.of(fkCatalog, fkSchema, fkTable),
+        getOptions());
+  }
+
+  /**
+   * Builder for {@link ArrowFlightSqlClientHandler}.
+   */
+  public static final class Builder {
+    private final Set<FlightClientMiddleware.Factory> middlewareFactories = 
new HashSet<>();
+    private final Set<CallOption> options = new HashSet<>();
+    private String host;
+    private int port;
+    private String username;
+    private String password;
+    private String trustStorePath;
+    private String trustStorePassword;
+    private String token;
+    private boolean useEncryption;
+    private boolean disableCertificateVerification;
+    private boolean useSystemTrustStore;
+    private BufferAllocator allocator;
+
+    /**
+     * Sets the host for this handler.
+     *
+     * @param host the host.
+     * @return this instance.
+     */
+    public Builder withHost(final String host) {
+      this.host = host;
+      return this;
+    }
+
+    /**
+     * Sets the port for this handler.
+     *
+     * @param port the port.
+     * @return this instance.
+     */
+    public Builder withPort(final int port) {
+      this.port = port;
+      return this;
+    }
+
+    /**
+     * Sets the username for this handler.
+     *
+     * @param username the username.
+     * @return this instance.
+     */
+    public Builder withUsername(final String username) {
+      this.username = username;
+      return this;
+    }
+
+    /**
+     * Sets the password for this handler.
+     *
+     * @param password the password.
+     * @return this instance.
+     */
+    public Builder withPassword(final String password) {
+      this.password = password;
+      return this;
+    }
+
+    /**
+     * Sets the KeyStore path for this handler.
+     *
+     * @param trustStorePath the KeyStore path.
+     * @return this instance.
+     */
+    public Builder withTrustStorePath(final String trustStorePath) {
+      this.trustStorePath = trustStorePath;
+      return this;
+    }
+
+    /**
+     * Sets the KeyStore password for this handler.
+     *
+     * @param trustStorePassword the KeyStore password.
+     * @return this instance.
+     */
+    public Builder withTrustStorePassword(final String trustStorePassword) {
+      this.trustStorePassword = trustStorePassword;
+      return this;
+    }
+
+    /**
+     * Sets whether to use TLS encryption in this handler.
+     *
+     * @param useEncryption whether to use TLS encryption.
+     * @return this instance.
+     */
+    public Builder withEncryption(final boolean useEncryption) {
+      this.useEncryption = useEncryption;
+      return this;
+    }
+
+    /**
+     * Sets whether to disable the certificate verification in this handler.
+     *
+     * @param disableCertificateVerification whether to disable certificate 
verification.
+     * @return this instance.
+     */
+    public Builder withDisableCertificateVerification(final boolean 
disableCertificateVerification) {
+      this.disableCertificateVerification = disableCertificateVerification;
+      return this;
+    }
+
+    /**
+     * Sets whether to use the certificates from the operating system.
+     *
+     * @param useSystemTrustStore whether to use the system operating 
certificates.
+     * @return this instance.
+     */
+    public Builder withSystemTrustStore(final boolean useSystemTrustStore) {
+      this.useSystemTrustStore = useSystemTrustStore;
+      return this;
+    }
+
+    /**
+     * Sets the token used in the token authetication.
+     * @param token the token value.
+     * @return      this builder instance.
+     */
+    public Builder withToken(final String token) {
+      this.token = token;
+      return this;
+    }
+
+    /**
+     * Sets the {@link BufferAllocator} to use in this handler.
+     *
+     * @param allocator the allocator.
+     * @return this instance.
+     */
+    public Builder withBufferAllocator(final BufferAllocator allocator) {
+      this.allocator = allocator
+          .newChildAllocator("ArrowFlightSqlClientHandler", 0, 
allocator.getLimit());
+      return this;
+    }
+
+    /**
+     * Adds the provided {@code factories} to the list of {@link 
#middlewareFactories} of this handler.
+     *
+     * @param factories the factories to add.
+     * @return this instance.
+     */
+    public Builder withMiddlewareFactories(final 
FlightClientMiddleware.Factory... factories) {
+      return withMiddlewareFactories(Arrays.asList(factories));
+    }
+
+    /**
+     * Adds the provided {@code factories} to the list of {@link 
#middlewareFactories} of this handler.
+     *
+     * @param factories the factories to add.
+     * @return this instance.
+     */
+    public Builder withMiddlewareFactories(
+        final Collection<FlightClientMiddleware.Factory> factories) {
+      this.middlewareFactories.addAll(factories);
+      return this;
+    }
+
+    /**
+     * Adds the provided {@link CallOption}s to this handler.
+     *
+     * @param options the options
+     * @return this instance.
+     */
+    public Builder withCallOptions(final CallOption... options) {
+      return withCallOptions(Arrays.asList(options));
+    }
+
+    /**
+     * Adds the provided {@link CallOption}s to this handler.
+     *
+     * @param options the options
+     * @return this instance.
+     */
+    public Builder withCallOptions(final Collection<CallOption> options) {
+      this.options.addAll(options);
+      return this;
+    }
+
+    /**
+     * Builds a new {@link ArrowFlightSqlClientHandler} from the provided 
fields.
+     *
+     * @return a new client handler.
+     * @throws SQLException on error.
+     */
+    public ArrowFlightSqlClientHandler build() throws SQLException {
+      FlightClient client = null;
+      try {
+        ClientIncomingAuthHeaderMiddleware.Factory authFactory = null;
+        if (username != null) {
+          authFactory =
+              new ClientIncomingAuthHeaderMiddleware.Factory(new 
ClientBearerHeaderHandler());
+          withMiddlewareFactories(authFactory);
+        }
+        final FlightClient.Builder clientBuilder = 
FlightClient.builder().allocator(allocator);
+        withMiddlewareFactories(new ClientCookieMiddleware.Factory());
+        middlewareFactories.forEach(clientBuilder::intercept);
+        Location location;
+        if (useEncryption) {
+          location = Location.forGrpcTls(host, port);
+          clientBuilder.useTls();
+        } else {
+          location = Location.forGrpcInsecure(host, port);
+        }
+        clientBuilder.location(location);
+
+        if (useEncryption) {
+          if (disableCertificateVerification) {
+            clientBuilder.verifyServer(false);
+          } else {
+            if (useSystemTrustStore) {

Review Comment:
   Doesn't gRPC configure this for you?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriver.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.replaceSemiColons;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty;
+import org.apache.arrow.driver.jdbc.utils.UrlParser;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.Preconditions;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.DriverVersion;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.UnregisteredDriver;
+
+/**
+ * JDBC driver for querying data from an Apache Arrow Flight server.
+ */
+public class ArrowFlightJdbcDriver extends UnregisteredDriver {
+
+  private static final String CONNECT_STRING_PREFIX = "jdbc:arrow-flight://";
+  private static final String CONNECTION_STRING_EXPECTED = 
"jdbc:arrow-flight://[host][:port][?param1=value&...]";
+  private static DriverVersion version;
+
+  static {
+    // Special code for supporting Java9 and higher.
+    // Netty requires some extra properties to unlock some native memory 
management api
+    // Setting this property if not already set externally
+    // This has to be done before any netty class is being loaded
+    final String key = "cfjd.io.netty.tryReflectionSetAccessible";
+    final String tryReflectionSetAccessible = System.getProperty(key);
+    if (tryReflectionSetAccessible == null) {
+      System.setProperty(key, Boolean.TRUE.toString());
+    }
+
+    new ArrowFlightJdbcDriver().register();
+  }

Review Comment:
   This is another thing that needs to go into documentation, but I think even 
this won't be adequate: anything using Arrow on JDK16+ will require an 
`--add-opens`



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/accessor/impl/calendar/ArrowFlightJdbcTimeStampVectorAccessor.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.accessor.impl.calendar;
+
+import static 
org.apache.arrow.driver.jdbc.accessor.impl.calendar.ArrowFlightJdbcTimeStampVectorGetter.Getter;
+import static 
org.apache.arrow.driver.jdbc.accessor.impl.calendar.ArrowFlightJdbcTimeStampVectorGetter.Holder;
+import static 
org.apache.arrow.driver.jdbc.accessor.impl.calendar.ArrowFlightJdbcTimeStampVectorGetter.createGetter;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Calendar;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntSupplier;
+
+import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessor;
+import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessorFactory;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.util.DateUtility;
+
+/**
+ * Accessor for the Arrow types extending from {@link TimeStampVector}.
+ */
+public class ArrowFlightJdbcTimeStampVectorAccessor extends 
ArrowFlightJdbcAccessor {
+
+  private final TimeZone timeZone;
+  private final Getter getter;
+  private final TimeUnit timeUnit;
+  private final LongToLocalDateTime longToLocalDateTime;
+  private final Holder holder;
+
+  /**
+   * Functional interface used to convert a number (in any time resolution) to 
LocalDateTime.
+   */
+  interface LongToLocalDateTime {
+    LocalDateTime fromLong(long value);
+  }
+
+  /**
+   * Instantiate a ArrowFlightJdbcTimeStampVectorAccessor for given vector.
+   */
+  public ArrowFlightJdbcTimeStampVectorAccessor(TimeStampVector vector,
+                                                IntSupplier currentRowSupplier,
+                                                
ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    super(currentRowSupplier, setCursorWasNull);
+    this.holder = new Holder();
+    this.getter = createGetter(vector);
+
+    this.timeZone = getTimeZoneForVector(vector);
+    this.timeUnit = getTimeUnitForVector(vector);
+    this.longToLocalDateTime = getLongToLocalDateTimeForVector(vector, 
this.timeZone);
+  }
+
+  @Override
+  public Class<?> getObjectClass() {
+    return Timestamp.class;
+  }
+
+  @Override
+  public Object getObject() {
+    return this.getTimestamp(null);
+  }
+
+  private LocalDateTime getLocalDateTime(Calendar calendar) {
+    getter.get(getCurrentRow(), holder);
+    this.wasNull = holder.isSet == 0;
+    this.wasNullConsumer.setWasNull(this.wasNull);
+    if (this.wasNull) {
+      return null;
+    }
+
+    long value = holder.value;
+
+    LocalDateTime localDateTime = this.longToLocalDateTime.fromLong(value);
+
+    if (calendar != null) {
+      TimeZone timeZone = calendar.getTimeZone();
+      long millis = this.timeUnit.toMillis(value);
+      localDateTime = localDateTime
+          .minus(timeZone.getOffset(millis) - this.timeZone.getOffset(millis), 
ChronoUnit.MILLIS);
+    }
+    return localDateTime;
+  }
+
+  @Override
+  public Date getDate(Calendar calendar) {
+    LocalDateTime localDateTime = getLocalDateTime(calendar);
+    if (localDateTime == null) {
+      return null;
+    }
+
+    return new Date(Timestamp.valueOf(localDateTime).getTime());
+  }
+
+  @Override
+  public Time getTime(Calendar calendar) {
+    LocalDateTime localDateTime = getLocalDateTime(calendar);
+    if (localDateTime == null) {
+      return null;
+    }
+
+    return new Time(Timestamp.valueOf(localDateTime).getTime());
+  }
+
+  @Override
+  public Timestamp getTimestamp(Calendar calendar) {
+    LocalDateTime localDateTime = getLocalDateTime(calendar);
+    if (localDateTime == null) {
+      return null;
+    }
+
+    return Timestamp.valueOf(localDateTime);
+  }
+
+  protected static TimeUnit getTimeUnitForVector(TimeStampVector vector) {
+    ArrowType.Timestamp arrowType =
+        (ArrowType.Timestamp) vector.getField().getFieldType().getType();
+
+    switch (arrowType.getUnit()) {
+      case NANOSECOND:
+        return TimeUnit.NANOSECONDS;
+      case MICROSECOND:
+        return TimeUnit.MICROSECONDS;
+      case MILLISECOND:
+        return TimeUnit.MILLISECONDS;
+      case SECOND:
+        return TimeUnit.SECONDS;
+      default:
+        throw new UnsupportedOperationException("Invalid Arrow time unit");
+    }
+  }
+
+  protected static LongToLocalDateTime 
getLongToLocalDateTimeForVector(TimeStampVector vector,
+                                                                       
TimeZone timeZone) {
+    String timeZoneID = timeZone.getID();
+
+    ArrowType.Timestamp arrowType =
+        (ArrowType.Timestamp) vector.getField().getFieldType().getType();
+
+    switch (arrowType.getUnit()) {
+      case NANOSECOND:
+        return nanoseconds -> 
DateUtility.getLocalDateTimeFromEpochNano(nanoseconds, timeZoneID);
+      case MICROSECOND:
+        return microseconds -> 
DateUtility.getLocalDateTimeFromEpochMicro(microseconds, timeZoneID);
+      case MILLISECOND:
+        return milliseconds -> 
DateUtility.getLocalDateTimeFromEpochMilli(milliseconds, timeZoneID);
+      case SECOND:
+        return seconds -> DateUtility.getLocalDateTimeFromEpochMilli(
+            TimeUnit.SECONDS.toMillis(seconds), timeZoneID);
+      default:
+        throw new UnsupportedOperationException("Invalid Arrow time unit");
+    }
+  }
+
+  protected static TimeZone getTimeZoneForVector(TimeStampVector vector) {
+    ArrowType.Timestamp arrowType =
+        (ArrowType.Timestamp) vector.getField().getFieldType().getType();
+
+    String timezoneName = arrowType.getTimezone();
+    if (timezoneName == null) {
+      return TimeZone.getTimeZone("UTC");

Review Comment:
   I suppose the issue here is that a JDBC TimeStamp is always "zoned", and 
there's no way to represent a timestamp without timezone the way that Arrow has?
   
   As a follow-up, we should start a documentation page to collect caveats like 
this



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFactory.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.TimeZone;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaFactory;
+import org.apache.calcite.avatica.AvaticaResultSetMetaData;
+import org.apache.calcite.avatica.AvaticaSpecificDatabaseMetaData;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.QueryState;
+import org.apache.calcite.avatica.UnregisteredDriver;
+
+/**
+ * Factory for the Arrow Flight JDBC Driver.
+ */
+public class ArrowFlightJdbcFactory implements AvaticaFactory {
+  private final int major;
+  private final int minor;
+
+  // This need to be public so Avatica can call this constructor
+  public ArrowFlightJdbcFactory() {
+    this(4, 1);

Review Comment:
   Any particular reason for the version number?
   
   Can they be constants?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.replaceSemiColons;
+
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
+import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.util.Preconditions;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaFactory;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+/**
+ * Connection to the Arrow Flight server.
+ */
+public final class ArrowFlightConnection extends AvaticaConnection {
+
+  private final BufferAllocator allocator;
+  private final ArrowFlightSqlClientHandler clientHandler;
+  private final ArrowFlightConnectionConfigImpl config;
+  private ExecutorService executorService;
+
+  /**
+   * Creates a new {@link ArrowFlightConnection}.
+   *
+   * @param driver        the {@link ArrowFlightJdbcDriver} to use.
+   * @param factory       the {@link AvaticaFactory} to use.
+   * @param url           the URL to use.
+   * @param properties    the {@link Properties} to use.
+   * @param config        the {@link ArrowFlightConnectionConfigImpl} to use.
+   * @param allocator     the {@link BufferAllocator} to use.
+   * @param clientHandler the {@link ArrowFlightSqlClientHandler} to use.
+   */
+  private ArrowFlightConnection(final ArrowFlightJdbcDriver driver, final 
AvaticaFactory factory,
+                                final String url, final Properties properties,
+                                final ArrowFlightConnectionConfigImpl config,
+                                final BufferAllocator allocator,
+                                final ArrowFlightSqlClientHandler 
clientHandler) {
+    super(driver, factory, url, properties);
+    this.config = Preconditions.checkNotNull(config, "Config cannot be null.");
+    this.allocator = Preconditions.checkNotNull(allocator, "Allocator cannot 
be null.");
+    this.clientHandler = Preconditions.checkNotNull(clientHandler, "Handler 
cannot be null.");
+  }
+
+  /**
+   * Creates a new {@link ArrowFlightConnection} to a {@link FlightClient}.
+   *
+   * @param driver     the {@link ArrowFlightJdbcDriver} to use.
+   * @param factory    the {@link AvaticaFactory} to use.
+   * @param url        the URL to establish the connection to.
+   * @param properties the {@link Properties} to use for this session.
+   * @param allocator  the {@link BufferAllocator} to use.
+   * @return a new {@link ArrowFlightConnection}.
+   * @throws SQLException on error.
+   */
+  static ArrowFlightConnection createNewConnection(final ArrowFlightJdbcDriver 
driver,
+                                                   final AvaticaFactory 
factory,
+                                                   String url, final 
Properties properties,
+                                                   final BufferAllocator 
allocator)
+      throws SQLException {
+    url = replaceSemiColons(url);
+    final ArrowFlightConnectionConfigImpl config = new 
ArrowFlightConnectionConfigImpl(properties);
+    final ArrowFlightSqlClientHandler clientHandler = 
createNewClientHandler(config, allocator);
+    return new ArrowFlightConnection(driver, factory, url, properties, config, 
allocator, clientHandler);
+  }
+
+  private static ArrowFlightSqlClientHandler createNewClientHandler(
+      final ArrowFlightConnectionConfigImpl config,
+      final BufferAllocator allocator) throws SQLException {
+    try {
+      return new ArrowFlightSqlClientHandler.Builder()
+          .withHost(config.getHost())
+          .withPort(config.getPort())
+          .withUsername(config.getUser())
+          .withPassword(config.getPassword())
+          .withTrustStorePath(config.getTrustStorePath())
+          .withTrustStorePassword(config.getTrustStorePassword())
+          .withSystemTrustStore(config.useSystemTrustStore())
+          .withBufferAllocator(allocator)
+          .withEncryption(config.useEncryption())
+          
.withDisableCertificateVerification(config.getDisableCertificateVerification())
+          .withToken(config.getToken())
+          .withCallOptions(config.toCallOption())
+          .build();
+    } catch (final SQLException e) {
+      try {
+        allocator.close();
+      } catch (final Exception allocatorCloseEx) {
+        e.addSuppressed(allocatorCloseEx);
+      }
+      throw e;
+    }
+  }
+
+  void reset() throws SQLException {
+    // Clean up any open Statements
+    try {
+      AutoCloseables.close(statementMap.values());
+    } catch (final Exception e) {
+      throw AvaticaConnection.HELPER.createException(e.getMessage(), e);
+    }
+
+    statementMap.clear();
+
+    // Reset Holdability
+    this.setHoldability(this.metaData.getResultSetHoldability());
+
+    // Reset Meta
+    ((ArrowFlightMetaImpl) this.meta).setDefaultConnectionProperties();
+  }
+
+  /**
+   * Gets the client {@link #clientHandler} backing this connection.
+   *
+   * @return the handler.
+   */
+  ArrowFlightSqlClientHandler getClientHandler() throws SQLException {
+    return clientHandler;
+  }
+
+  /**
+   * Gets the {@link ExecutorService} of this connection.
+   *
+   * @return the {@link #executorService}.
+   */
+  synchronized ExecutorService getExecutorService() {
+    return executorService = executorService == null ?
+        Executors.newFixedThreadPool(config.threadPoolSize(),
+            new DefaultThreadFactory(getClass().getSimpleName())) :
+        executorService;
+  }
+
+  @Override
+  public Properties getClientInfo() {
+    final Properties copy = new Properties();
+    copy.putAll(info);
+    return copy;
+  }
+
+  @Override
+  public void close() throws SQLException {
+    if (executorService != null) {
+      executorService.shutdown();

Review Comment:
   I think you also want to `awaitTermination` afterwards?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriver.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.replaceSemiColons;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty;
+import org.apache.arrow.driver.jdbc.utils.UrlParser;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.Preconditions;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.DriverVersion;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.UnregisteredDriver;
+
+/**
+ * JDBC driver for querying data from an Apache Arrow Flight server.
+ */
+public class ArrowFlightJdbcDriver extends UnregisteredDriver {
+
+  private static final String CONNECT_STRING_PREFIX = "jdbc:arrow-flight://";
+  private static final String CONNECTION_STRING_EXPECTED = 
"jdbc:arrow-flight://[host][:port][?param1=value&...]";
+  private static DriverVersion version;
+
+  static {
+    // Special code for supporting Java9 and higher.
+    // Netty requires some extra properties to unlock some native memory 
management api
+    // Setting this property if not already set externally
+    // This has to be done before any netty class is being loaded
+    final String key = "cfjd.io.netty.tryReflectionSetAccessible";
+    final String tryReflectionSetAccessible = System.getProperty(key);
+    if (tryReflectionSetAccessible == null) {
+      System.setProperty(key, Boolean.TRUE.toString());
+    }
+
+    new ArrowFlightJdbcDriver().register();
+  }
+
+  @Override
+  public ArrowFlightConnection connect(final String url, final Properties info)
+      throws SQLException {
+    final Properties properties = new Properties(info);
+    properties.putAll(info);
+
+    if (url != null) {
+      final Map<Object, Object> propertiesFromUrl = getUrlsArgs(url);
+      properties.putAll(propertiesFromUrl);
+    }
+
+    try {
+      return ArrowFlightConnection.createNewConnection(
+          this,
+          factory,
+          url,
+          properties,
+          new RootAllocator(Long.MAX_VALUE));
+    } catch (final FlightRuntimeException e) {
+      throw new SQLException("Failed to connect.", e);
+    }
+  }
+
+  @Override
+  protected String getFactoryClassName(final JdbcVersion jdbcVersion) {
+    return ArrowFlightJdbcFactory.class.getName();
+  }
+
+  @Override
+  protected DriverVersion createDriverVersion() {
+    if (version == null) {
+      final InputStream flightProperties = 
this.getClass().getResourceAsStream("/properties/flight.properties");
+      if (flightProperties == null) {
+        throw new RuntimeException("Flight Properties not found. Ensure the 
JAR was built properly.");
+      }
+      try (final Reader reader = new BufferedReader(new 
InputStreamReader(flightProperties, StandardCharsets.UTF_8))) {
+        final Properties properties = new Properties();
+        properties.load(reader);
+
+        final String parentName = 
properties.getProperty("org.apache.arrow.flight.name");

Review Comment:
   Similarly, these properties should be namespaced consistently with the 
package, e.g. `org.apache.arrow.driver.jdbc.name`



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/accessor/impl/complex/AbstractArrowFlightJdbcUnionVectorAccessor.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.accessor.impl.complex;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.Ref;
+import java.sql.SQLException;
+import java.sql.SQLXML;
+import java.sql.Struct;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.Map;
+import java.util.function.IntSupplier;
+
+import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessor;
+import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessorFactory;
+import 
org.apache.arrow.driver.jdbc.accessor.impl.ArrowFlightJdbcNullVectorAccessor;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.complex.DenseUnionVector;
+import org.apache.arrow.vector.complex.UnionVector;
+
+/**
+ * Base accessor for {@link UnionVector} and {@link DenseUnionVector}.
+ */
+public abstract class AbstractArrowFlightJdbcUnionVectorAccessor extends 
ArrowFlightJdbcAccessor {
+
+  /**
+   * Array of accessors for each type contained in UnionVector.
+   * Index corresponds to UnionVector and DenseUnionVector typeIds which are 
both limited to 128.
+   */
+  private final ArrowFlightJdbcAccessor[] accessors = new 
ArrowFlightJdbcAccessor[128];
+
+  private final ArrowFlightJdbcNullVectorAccessor nullAccessor =
+      new ArrowFlightJdbcNullVectorAccessor((boolean wasNull) -> {
+      });
+
+  protected AbstractArrowFlightJdbcUnionVectorAccessor(IntSupplier 
currentRowSupplier,
+      ArrowFlightJdbcAccessorFactory.WasNullConsumer setCursorWasNull) {
+    super(currentRowSupplier, setCursorWasNull);
+  }
+
+  protected abstract ArrowFlightJdbcAccessor 
createAccessorForVector(ValueVector vector);
+
+  protected abstract byte getCurrentTypeId();
+
+  protected abstract ValueVector getVectorByTypeId(byte typeId);
+
+  /**
+   * Returns an accessor for UnionVector child vector on current row.
+   *
+   * @return ArrowFlightJdbcAccessor for child vector on current row.
+   */
+  protected ArrowFlightJdbcAccessor getAccessor() {
+    // Get the typeId and child vector for the current row being accessed.
+    byte typeId = this.getCurrentTypeId();
+    ValueVector vector = this.getVectorByTypeId(typeId);
+
+    if (typeId < 0) {
+      // typeId may be negative if the current row has no type defined.

Review Comment:
   this is saying, typeId < 0 <=> the row was null?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcDriver.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.replaceSemiColons;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty;
+import org.apache.arrow.driver.jdbc.utils.UrlParser;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.Preconditions;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.DriverVersion;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.UnregisteredDriver;
+
+/**
+ * JDBC driver for querying data from an Apache Arrow Flight server.
+ */
+public class ArrowFlightJdbcDriver extends UnregisteredDriver {
+
+  private static final String CONNECT_STRING_PREFIX = "jdbc:arrow-flight://";

Review Comment:
   I'd like us to use `arrow-flight-sql` instead of just `arrow-flight` for 
these sorts of identifiers, or else things will be confusing



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightConnection.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static 
org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.replaceSemiColons;
+
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler;
+import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.util.Preconditions;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaFactory;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+/**
+ * Connection to the Arrow Flight server.
+ */
+public final class ArrowFlightConnection extends AvaticaConnection {
+
+  private final BufferAllocator allocator;
+  private final ArrowFlightSqlClientHandler clientHandler;
+  private final ArrowFlightConnectionConfigImpl config;
+  private ExecutorService executorService;
+
+  /**
+   * Creates a new {@link ArrowFlightConnection}.
+   *
+   * @param driver        the {@link ArrowFlightJdbcDriver} to use.
+   * @param factory       the {@link AvaticaFactory} to use.
+   * @param url           the URL to use.
+   * @param properties    the {@link Properties} to use.
+   * @param config        the {@link ArrowFlightConnectionConfigImpl} to use.
+   * @param allocator     the {@link BufferAllocator} to use.
+   * @param clientHandler the {@link ArrowFlightSqlClientHandler} to use.
+   */
+  private ArrowFlightConnection(final ArrowFlightJdbcDriver driver, final 
AvaticaFactory factory,
+                                final String url, final Properties properties,
+                                final ArrowFlightConnectionConfigImpl config,
+                                final BufferAllocator allocator,
+                                final ArrowFlightSqlClientHandler 
clientHandler) {
+    super(driver, factory, url, properties);
+    this.config = Preconditions.checkNotNull(config, "Config cannot be null.");
+    this.allocator = Preconditions.checkNotNull(allocator, "Allocator cannot 
be null.");
+    this.clientHandler = Preconditions.checkNotNull(clientHandler, "Handler 
cannot be null.");
+  }
+
+  /**
+   * Creates a new {@link ArrowFlightConnection} to a {@link FlightClient}.
+   *
+   * @param driver     the {@link ArrowFlightJdbcDriver} to use.
+   * @param factory    the {@link AvaticaFactory} to use.
+   * @param url        the URL to establish the connection to.
+   * @param properties the {@link Properties} to use for this session.
+   * @param allocator  the {@link BufferAllocator} to use.
+   * @return a new {@link ArrowFlightConnection}.
+   * @throws SQLException on error.
+   */
+  static ArrowFlightConnection createNewConnection(final ArrowFlightJdbcDriver 
driver,
+                                                   final AvaticaFactory 
factory,
+                                                   String url, final 
Properties properties,
+                                                   final BufferAllocator 
allocator)
+      throws SQLException {
+    url = replaceSemiColons(url);
+    final ArrowFlightConnectionConfigImpl config = new 
ArrowFlightConnectionConfigImpl(properties);
+    final ArrowFlightSqlClientHandler clientHandler = 
createNewClientHandler(config, allocator);
+    return new ArrowFlightConnection(driver, factory, url, properties, config, 
allocator, clientHandler);
+  }
+
+  private static ArrowFlightSqlClientHandler createNewClientHandler(
+      final ArrowFlightConnectionConfigImpl config,
+      final BufferAllocator allocator) throws SQLException {
+    try {
+      return new ArrowFlightSqlClientHandler.Builder()
+          .withHost(config.getHost())
+          .withPort(config.getPort())
+          .withUsername(config.getUser())
+          .withPassword(config.getPassword())
+          .withTrustStorePath(config.getTrustStorePath())
+          .withTrustStorePassword(config.getTrustStorePassword())
+          .withSystemTrustStore(config.useSystemTrustStore())
+          .withBufferAllocator(allocator)
+          .withEncryption(config.useEncryption())
+          
.withDisableCertificateVerification(config.getDisableCertificateVerification())
+          .withToken(config.getToken())
+          .withCallOptions(config.toCallOption())
+          .build();
+    } catch (final SQLException e) {
+      try {
+        allocator.close();
+      } catch (final Exception allocatorCloseEx) {
+        e.addSuppressed(allocatorCloseEx);
+      }
+      throw e;
+    }
+  }
+
+  void reset() throws SQLException {
+    // Clean up any open Statements
+    try {
+      AutoCloseables.close(statementMap.values());
+    } catch (final Exception e) {
+      throw AvaticaConnection.HELPER.createException(e.getMessage(), e);
+    }
+
+    statementMap.clear();
+
+    // Reset Holdability
+    this.setHoldability(this.metaData.getResultSetHoldability());
+
+    // Reset Meta
+    ((ArrowFlightMetaImpl) this.meta).setDefaultConnectionProperties();
+  }
+
+  /**
+   * Gets the client {@link #clientHandler} backing this connection.
+   *
+   * @return the handler.
+   */
+  ArrowFlightSqlClientHandler getClientHandler() throws SQLException {
+    return clientHandler;
+  }
+
+  /**
+   * Gets the {@link ExecutorService} of this connection.
+   *
+   * @return the {@link #executorService}.
+   */
+  synchronized ExecutorService getExecutorService() {
+    return executorService = executorService == null ?
+        Executors.newFixedThreadPool(config.threadPoolSize(),
+            new DefaultThreadFactory(getClass().getSimpleName())) :
+        executorService;
+  }
+
+  @Override
+  public Properties getClientInfo() {
+    final Properties copy = new Properties();
+    copy.putAll(info);
+    return copy;
+  }
+
+  @Override
+  public void close() throws SQLException {
+    if (executorService != null) {
+      executorService.shutdown();
+    }
+
+    try {
+      AutoCloseables.close(clientHandler);
+      allocator.getChildAllocators().forEach(AutoCloseables::closeNoChecked);
+      AutoCloseables.close(allocator);

Review Comment:
   It might be good to build a list and pass them all to 
AutoCloseables.close(), so that if one errors, the rest still get closed.



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcConnectionPoolDataSource.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.sql.ConnectionEvent;
+import javax.sql.ConnectionEventListener;
+import javax.sql.ConnectionPoolDataSource;
+import javax.sql.PooledConnection;
+
+import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl;
+
+/**
+ * {@link ConnectionPoolDataSource} implementation for Arrow Flight JDBC 
Driver.
+ */
+public class ArrowFlightJdbcConnectionPoolDataSource extends 
ArrowFlightJdbcDataSource
+    implements ConnectionPoolDataSource, ConnectionEventListener, 
AutoCloseable {
+  private final Map<Properties, Queue<ArrowFlightJdbcPooledConnection>> pool =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Instantiates a new DataSource.
+   *
+   * @param properties the properties
+   * @param config     the config.
+   */
+  protected ArrowFlightJdbcConnectionPoolDataSource(final Properties 
properties,
+                                                    final 
ArrowFlightConnectionConfigImpl config) {
+    super(properties, config);
+  }
+
+  /**
+   * Creates a new {@link ArrowFlightJdbcConnectionPoolDataSource}.
+   *
+   * @param properties the properties.
+   * @return a new data source.
+   */
+  public static ArrowFlightJdbcConnectionPoolDataSource createNewDataSource(
+      final Properties properties) {
+    return new ArrowFlightJdbcConnectionPoolDataSource(properties,
+        new ArrowFlightConnectionConfigImpl(properties));
+  }
+
+  @Override
+  public PooledConnection getPooledConnection() throws SQLException {
+    final ArrowFlightConnectionConfigImpl config = getConfig();
+    return this.getPooledConnection(config.getUser(), config.getPassword());
+  }
+
+  @Override
+  public PooledConnection getPooledConnection(final String username, final 
String password)
+      throws SQLException {
+    final Properties properties = getProperties(username, password);
+    Queue<ArrowFlightJdbcPooledConnection> objectPool =
+        pool.computeIfAbsent(properties, s -> new ConcurrentLinkedQueue<>());
+    ArrowFlightJdbcPooledConnection pooledConnection = objectPool.poll();
+    if (pooledConnection == null) {
+      pooledConnection = createPooledConnection(new 
ArrowFlightConnectionConfigImpl(properties));
+    } else {
+      pooledConnection.reset();
+    }
+    return pooledConnection;
+  }
+
+  private ArrowFlightJdbcPooledConnection createPooledConnection(
+      final ArrowFlightConnectionConfigImpl config)
+      throws SQLException {
+    ArrowFlightJdbcPooledConnection pooledConnection =
+        new ArrowFlightJdbcPooledConnection(getConnection(config.getUser(), 
config.getPassword()));
+    pooledConnection.addConnectionEventListener(this);
+    return pooledConnection;
+  }
+
+  @Override
+  public void connectionClosed(ConnectionEvent connectionEvent) {
+    final ArrowFlightJdbcPooledConnection pooledConnection =
+        (ArrowFlightJdbcPooledConnection) connectionEvent.getSource();
+    pool.get(pooledConnection.getProperties()).add(pooledConnection);
+  }
+
+  @Override
+  public void connectionErrorOccurred(ConnectionEvent connectionEvent) {
+
+  }
+
+  @Override
+  public void close() throws Exception {
+    SQLException lastException = null;
+    for (Queue<ArrowFlightJdbcPooledConnection> connections : 
this.pool.values()) {
+      while (!connections.isEmpty()) {
+        PooledConnection pooledConnection = connections.poll();
+        try {
+          pooledConnection.close();
+        } catch (SQLException e) {
+          lastException = e;

Review Comment:
   SQLException lets you attach exceptions, so you could report them all



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static java.lang.String.format;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import 
org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
+import org.apache.arrow.util.Preconditions;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.QueryState;
+import org.apache.calcite.avatica.remote.TypedValue;
+
+/**
+ * Metadata handler for Arrow Flight.
+ */
+public class ArrowFlightMetaImpl extends MetaImpl {
+  private final Map<StatementHandle, PreparedStatement> 
statementHandlePreparedStatementMap;
+
+  /**
+   * Constructs a {@link MetaImpl} object specific for Arrow Flight.
+   * @param connection A {@link AvaticaConnection}.
+   */
+  public ArrowFlightMetaImpl(final AvaticaConnection connection) {
+    super(connection);
+    this.statementHandlePreparedStatementMap = new ConcurrentHashMap<>();
+    setDefaultConnectionProperties();
+  }
+
+  static Signature newSignature(final String sql) {
+    return new Signature(
+        new ArrayList<ColumnMetaData>(),
+        sql,
+        Collections.<AvaticaParameter>emptyList(),
+        Collections.<String, Object>emptyMap(),
+        null, // unnecessary, as SQL requests use ArrowFlightJdbcCursor
+        StatementType.SELECT
+    );
+  }
+
+  @Override
+  public void closeStatement(final StatementHandle statementHandle) {
+    PreparedStatement preparedStatement = 
statementHandlePreparedStatementMap.remove(statementHandle);
+    // Testing if the prepared statement was created because the statement can 
be not created until this moment
+    if (preparedStatement != null) {
+      preparedStatement.close();
+    }
+  }
+
+  @Override
+  public void commit(final ConnectionHandle connectionHandle) {
+    // TODO Fill this stub.
+  }
+
+  @Override
+  public ExecuteResult execute(final StatementHandle statementHandle,
+                               final List<TypedValue> typedValues, final long 
maxRowCount) {
+    // TODO Why is maxRowCount ignored?
+    Preconditions.checkNotNull(statementHandle.signature, "Signature not 
found.");
+    return new ExecuteResult(
+        Collections.singletonList(MetaResultSet.create(
+            statementHandle.connectionId, statementHandle.id,
+            true, statementHandle.signature, null)));
+  }
+
+  @Override
+  public ExecuteResult execute(final StatementHandle statementHandle,
+                               final List<TypedValue> typedValues, final int 
maxRowsInFirstFrame) {
+    return execute(statementHandle, typedValues, (long) maxRowsInFirstFrame);
+  }
+
+  @Override
+  public ExecuteBatchResult executeBatch(final StatementHandle statementHandle,
+                                         final List<List<TypedValue>> 
parameterValuesList)
+      throws IllegalStateException {

Review Comment:
   It's weird to declare a RuntimeException in the throws clause



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static java.lang.String.format;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import 
org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
+import org.apache.arrow.util.Preconditions;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.QueryState;
+import org.apache.calcite.avatica.remote.TypedValue;
+
+/**
+ * Metadata handler for Arrow Flight.
+ */
+public class ArrowFlightMetaImpl extends MetaImpl {
+  private final Map<StatementHandle, PreparedStatement> 
statementHandlePreparedStatementMap;
+
+  /**
+   * Constructs a {@link MetaImpl} object specific for Arrow Flight.
+   * @param connection A {@link AvaticaConnection}.
+   */
+  public ArrowFlightMetaImpl(final AvaticaConnection connection) {
+    super(connection);
+    this.statementHandlePreparedStatementMap = new ConcurrentHashMap<>();
+    setDefaultConnectionProperties();
+  }
+
+  static Signature newSignature(final String sql) {
+    return new Signature(
+        new ArrayList<ColumnMetaData>(),
+        sql,
+        Collections.<AvaticaParameter>emptyList(),
+        Collections.<String, Object>emptyMap(),
+        null, // unnecessary, as SQL requests use ArrowFlightJdbcCursor
+        StatementType.SELECT
+    );
+  }
+
+  @Override
+  public void closeStatement(final StatementHandle statementHandle) {
+    PreparedStatement preparedStatement = 
statementHandlePreparedStatementMap.remove(statementHandle);
+    // Testing if the prepared statement was created because the statement can 
be not created until this moment
+    if (preparedStatement != null) {
+      preparedStatement.close();
+    }
+  }
+
+  @Override
+  public void commit(final ConnectionHandle connectionHandle) {
+    // TODO Fill this stub.

Review Comment:
   Should this raise UnsupportedOperationException?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcCursor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.arrow.driver.jdbc.accessor.ArrowFlightJdbcAccessorFactory;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.util.AbstractCursor;
+import org.apache.calcite.avatica.util.ArrayImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Arrow Flight Jdbc's Cursor class.
+ */
+public class ArrowFlightJdbcCursor extends AbstractCursor {
+
+  private static final Logger LOGGER;
+  private final VectorSchemaRoot root;
+  private final int rowCount;
+  private int currentRow = -1;
+
+  static {
+    LOGGER = LoggerFactory.getLogger(ArrowFlightJdbcCursor.class);
+  }
+
+  public ArrowFlightJdbcCursor(VectorSchemaRoot root) {
+    this.root = root;
+    rowCount = root.getRowCount();
+  }
+
+  @Override
+  public List<Accessor> createAccessors(List<ColumnMetaData> columns,
+                                        Calendar localCalendar,
+                                        ArrayImpl.Factory factory) {
+    final List<FieldVector> fieldVectors = root.getFieldVectors();
+
+    return IntStream.range(0, fieldVectors.size()).mapToObj(root::getVector)
+        .map(this::createAccessor)
+        .collect(Collectors.toCollection(() -> new 
ArrayList<>(fieldVectors.size())));
+  }
+
+  private Accessor createAccessor(FieldVector vector) {
+    return ArrowFlightJdbcAccessorFactory.createAccessor(vector, 
this::getCurrentRow,
+        (boolean wasNull) -> {
+          // AbstractCursor creates a boolean array of length 1 to hold the 
wasNull value
+          this.wasNull[0] = wasNull;
+        });
+  }
+
+  /**
+   * ArrowFlightJdbcAccessors do not use {@link AbstractCursor.Getter}, as it 
would box primitive types and cause
+   * performance issues. Each Accessor implementation works directly on Arrow 
Vectors.
+   */
+  @Override
+  protected Getter createGetter(int column) {
+    throw new UnsupportedOperationException("Not allowed.");
+  }
+
+  @Override
+  public boolean next() {
+    currentRow++;
+    return currentRow < rowCount;
+  }
+
+  @Override
+  public void close() {
+    try {
+      AutoCloseables.close(root);
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);

Review Comment:
   Hmm, we don't want to throw an exception here?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static java.lang.String.format;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import 
org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
+import org.apache.arrow.util.Preconditions;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.QueryState;
+import org.apache.calcite.avatica.remote.TypedValue;
+
+/**
+ * Metadata handler for Arrow Flight.
+ */
+public class ArrowFlightMetaImpl extends MetaImpl {
+  private final Map<StatementHandle, PreparedStatement> 
statementHandlePreparedStatementMap;
+
+  /**
+   * Constructs a {@link MetaImpl} object specific for Arrow Flight.
+   * @param connection A {@link AvaticaConnection}.
+   */
+  public ArrowFlightMetaImpl(final AvaticaConnection connection) {
+    super(connection);
+    this.statementHandlePreparedStatementMap = new ConcurrentHashMap<>();
+    setDefaultConnectionProperties();
+  }
+
+  static Signature newSignature(final String sql) {
+    return new Signature(
+        new ArrayList<ColumnMetaData>(),
+        sql,
+        Collections.<AvaticaParameter>emptyList(),
+        Collections.<String, Object>emptyMap(),
+        null, // unnecessary, as SQL requests use ArrowFlightJdbcCursor
+        StatementType.SELECT
+    );
+  }
+
+  @Override
+  public void closeStatement(final StatementHandle statementHandle) {
+    PreparedStatement preparedStatement = 
statementHandlePreparedStatementMap.remove(statementHandle);
+    // Testing if the prepared statement was created because the statement can 
be not created until this moment
+    if (preparedStatement != null) {
+      preparedStatement.close();
+    }
+  }
+
+  @Override
+  public void commit(final ConnectionHandle connectionHandle) {
+    // TODO Fill this stub.
+  }
+
+  @Override
+  public ExecuteResult execute(final StatementHandle statementHandle,
+                               final List<TypedValue> typedValues, final long 
maxRowCount) {
+    // TODO Why is maxRowCount ignored?
+    Preconditions.checkNotNull(statementHandle.signature, "Signature not 
found.");
+    return new ExecuteResult(
+        Collections.singletonList(MetaResultSet.create(
+            statementHandle.connectionId, statementHandle.id,
+            true, statementHandle.signature, null)));
+  }
+
+  @Override
+  public ExecuteResult execute(final StatementHandle statementHandle,
+                               final List<TypedValue> typedValues, final int 
maxRowsInFirstFrame) {
+    return execute(statementHandle, typedValues, (long) maxRowsInFirstFrame);
+  }
+
+  @Override
+  public ExecuteBatchResult executeBatch(final StatementHandle statementHandle,
+                                         final List<List<TypedValue>> 
parameterValuesList)
+      throws IllegalStateException {
+    throw new IllegalStateException("executeBatch not implemented.");
+  }
+
+  @Override
+  public Frame fetch(final StatementHandle statementHandle, final long offset,
+                     final int fetchMaxRowCount) {
+    /*
+     * ArrowFlightMetaImpl does not use frames.
+     * Instead, we have accessors that contain a VectorSchemaRoot with
+     * the results.
+     */
+    throw AvaticaConnection.HELPER.wrap(
+        format("%s does not use frames.", this),
+        AvaticaConnection.HELPER.unsupported());
+  }
+
+  @Override
+  public StatementHandle prepare(final ConnectionHandle connectionHandle,
+                                 final String query, final long maxRowCount) {
+    final StatementHandle handle = super.createStatement(connectionHandle);
+    handle.signature = newSignature(query);
+    return handle;
+  }
+
+  @Override
+  public ExecuteResult prepareAndExecute(final StatementHandle statementHandle,
+                                         final String query, final long 
maxRowCount,
+                                         final PrepareCallback prepareCallback)
+      throws NoSuchStatementException {
+    return prepareAndExecute(
+        statementHandle, query, maxRowCount, -1 /* Not used */, 
prepareCallback);
+  }
+
+  @Override
+  public ExecuteResult prepareAndExecute(final StatementHandle handle,
+                                         final String query, final long 
maxRowCount,
+                                         final int maxRowsInFirstFrame,
+                                         final PrepareCallback callback)
+      throws NoSuchStatementException {
+    try {
+      final PreparedStatement preparedStatement =
+          ((ArrowFlightConnection) 
connection).getClientHandler().prepare(query);
+      final StatementType statementType = preparedStatement.getType();
+      statementHandlePreparedStatementMap.put(handle, preparedStatement);
+      final Signature signature = newSignature(query);
+      final long updateCount =
+          statementType.equals(StatementType.UPDATE) ? 
preparedStatement.executeUpdate() : -1;
+      synchronized (callback.getMonitor()) {
+        callback.clear();
+        callback.assign(signature, null, updateCount);
+      }
+      callback.execute();
+      final MetaResultSet metaResultSet = 
MetaResultSet.create(handle.connectionId, handle.id,
+          false, signature, null);
+      return new ExecuteResult(Collections.singletonList(metaResultSet));
+    } catch (SQLTimeoutException e) {
+      // So far AvaticaStatement(executeInternal) only handles NoSuchStatement 
and Runtime Exceptions.
+      throw new RuntimeException(e);
+    } catch (SQLException e) {
+      throw new NoSuchStatementException(handle);
+    }
+  }
+
+  @Override
+  public ExecuteBatchResult prepareAndExecuteBatch(
+      final StatementHandle statementHandle, final List<String> queries)
+      throws NoSuchStatementException {
+    // TODO Fill this stub.
+    return null;
+  }
+
+  @Override
+  public void rollback(final ConnectionHandle connectionHandle) {
+    // TODO Fill this stub.
+  }
+
+  @Override
+  public boolean syncResults(final StatementHandle statementHandle,
+                             final QueryState queryState, final long offset)
+      throws NoSuchStatementException {
+    // TODO Fill this stub.
+    return false;
+  }
+
+  void setDefaultConnectionProperties() {
+    // TODO Double-check this.

Review Comment:
   Is this resolved?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static java.lang.String.format;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import 
org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
+import org.apache.arrow.util.Preconditions;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.QueryState;
+import org.apache.calcite.avatica.remote.TypedValue;
+
+/**
+ * Metadata handler for Arrow Flight.
+ */
+public class ArrowFlightMetaImpl extends MetaImpl {
+  private final Map<StatementHandle, PreparedStatement> 
statementHandlePreparedStatementMap;
+
+  /**
+   * Constructs a {@link MetaImpl} object specific for Arrow Flight.
+   * @param connection A {@link AvaticaConnection}.
+   */
+  public ArrowFlightMetaImpl(final AvaticaConnection connection) {
+    super(connection);
+    this.statementHandlePreparedStatementMap = new ConcurrentHashMap<>();
+    setDefaultConnectionProperties();
+  }
+
+  static Signature newSignature(final String sql) {
+    return new Signature(
+        new ArrayList<ColumnMetaData>(),
+        sql,
+        Collections.<AvaticaParameter>emptyList(),
+        Collections.<String, Object>emptyMap(),
+        null, // unnecessary, as SQL requests use ArrowFlightJdbcCursor
+        StatementType.SELECT
+    );
+  }
+
+  @Override
+  public void closeStatement(final StatementHandle statementHandle) {
+    PreparedStatement preparedStatement = 
statementHandlePreparedStatementMap.remove(statementHandle);
+    // Testing if the prepared statement was created because the statement can 
be not created until this moment
+    if (preparedStatement != null) {
+      preparedStatement.close();
+    }
+  }
+
+  @Override
+  public void commit(final ConnectionHandle connectionHandle) {
+    // TODO Fill this stub.
+  }
+
+  @Override
+  public ExecuteResult execute(final StatementHandle statementHandle,
+                               final List<TypedValue> typedValues, final long 
maxRowCount) {
+    // TODO Why is maxRowCount ignored?
+    Preconditions.checkNotNull(statementHandle.signature, "Signature not 
found.");
+    return new ExecuteResult(
+        Collections.singletonList(MetaResultSet.create(
+            statementHandle.connectionId, statementHandle.id,
+            true, statementHandle.signature, null)));
+  }
+
+  @Override
+  public ExecuteResult execute(final StatementHandle statementHandle,
+                               final List<TypedValue> typedValues, final int 
maxRowsInFirstFrame) {
+    return execute(statementHandle, typedValues, (long) maxRowsInFirstFrame);
+  }
+
+  @Override
+  public ExecuteBatchResult executeBatch(final StatementHandle statementHandle,
+                                         final List<List<TypedValue>> 
parameterValuesList)
+      throws IllegalStateException {
+    throw new IllegalStateException("executeBatch not implemented.");

Review Comment:
   And wouldn't it be UnsupportedOperationException?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/utils/UrlParser.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * URL Parser for extracting key values from a connection string.
+ */
+public class UrlParser {
+  /**
+   * Parse a url key value parameters.
+   *
+   * @param url {@link String}
+   * @return {@link Map}
+   */
+  public static Map<String, String> parse(String url, String separator) {
+    Map<String, String> resultMap = new HashMap<>();
+    String[] keyValues = url.split(separator);
+
+    for (String keyValue : keyValues) {
+      int separatorKey = keyValue.indexOf("="); // Find the first equal sign 
to split key and value.
+      String key = keyValue.substring(0, separatorKey);

Review Comment:
   Do we need to url-decode?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/utils/UrlParser.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * URL Parser for extracting key values from a connection string.
+ */
+public class UrlParser {
+  /**
+   * Parse a url key value parameters.
+   *
+   * @param url {@link String}
+   * @return {@link Map}
+   */
+  public static Map<String, String> parse(String url, String separator) {

Review Comment:
   nit: `url` is a bit confusing, it's actually the query string



##########
java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/utils/ThrowableAssertionUtils.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.utils;
+
+/**
+ * Utility class to avoid upgrading JUnit to version >= 4.13 and keep using 
code to assert a {@link Throwable}.

Review Comment:
   We can just use junit5 (other modules do)



##########
java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.driver.jdbc.utils.CoreMockedSqlProducers;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+
+public class ArrowFlightPreparedStatementTest {
+
+  @ClassRule
+  public static final FlightServerTestRule FLIGHT_SERVER_TEST_RULE = 
FlightServerTestRule
+      .createStandardTestRule(CoreMockedSqlProducers.getLegacyProducer());
+
+  private static Connection connection;
+
+  @Rule
+  public final ErrorCollector collector = new ErrorCollector();
+
+  @BeforeClass
+  public static void setup() throws SQLException {
+    connection = FLIGHT_SERVER_TEST_RULE.getConnection(false);
+  }
+
+  @AfterClass
+  public static void tearDown() throws SQLException {
+    connection.close();
+  }
+
+  @Test
+  public void testSimpleQueryNoParameterBinding() throws SQLException {

Review Comment:
   is parameter binding unimplemented?



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightStreamQueue.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.utils;
+
+import static java.lang.String.format;
+import static java.util.Collections.synchronizedSet;
+import static org.apache.arrow.util.Preconditions.checkNotNull;
+import static org.apache.arrow.util.Preconditions.checkState;
+
+import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Auxiliary class used to handle consuming of multiple {@link FlightStream}.
+ * <p>
+ * The usage follows this routine:
+ * <ol>
+ *   <li>Create a <code>FlightStreamQueue</code>;</li>
+ *   <li>Call <code>enqueue(FlightStream)</code> for all streams to be 
consumed;</li>
+ *   <li>Call <code>next()</code> to get a FlightStream that is ready to 
consume</li>
+ *   <li>Consume the given FlightStream and add it back to the queue - call 
<code>enqueue(FlightStream)</code></li>
+ *   <li>Repeat from (3) until <code>next()</code> returns null.</li>
+ * </ol>
+ */
+public class FlightStreamQueue implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FlightStreamQueue.class);
+  private final CompletionService<FlightStream> completionService;
+  private final Set<Future<FlightStream>> futures = synchronizedSet(new 
HashSet<>());
+  private final Set<FlightStream> allStreams = synchronizedSet(new 
HashSet<>());
+  private final AtomicBoolean closed = new AtomicBoolean();
+
+  /**
+   * Instantiate a new FlightStreamQueue.
+   */
+  protected FlightStreamQueue(final CompletionService<FlightStream> 
executorService) {
+    completionService = checkNotNull(executorService);
+  }
+
+  /**
+   * Creates a new {@link FlightStreamQueue} from the provided {@link 
ExecutorService}.
+   *
+   * @param service the service from which to create a new queue.
+   * @return a new queue.
+   */
+  public static FlightStreamQueue createNewQueue(final ExecutorService 
service) {
+    return new FlightStreamQueue(new ExecutorCompletionService<>(service));
+  }
+
+  /**
+   * Gets whether this queue is closed.
+   *
+   * @return a boolean indicating whether this resource is closed.
+   */
+  public boolean isClosed() {
+    return closed.get();
+  }
+
+  /**
+   * Auxiliary functional interface for getting ready-to-consume FlightStreams.
+   */
+  @FunctionalInterface
+  interface FlightStreamSupplier {
+    Future<FlightStream> get() throws SQLException;
+  }
+
+  private FlightStream next(final FlightStreamSupplier flightStreamSupplier) 
throws SQLException {
+    checkOpen();
+    while (!futures.isEmpty()) {
+      final Future<FlightStream> future = flightStreamSupplier.get();
+      futures.remove(future);
+      try {
+        final FlightStream stream = future.get();
+        if (stream.getRoot().getRowCount() > 0) {
+          return stream;
+        }
+      } catch (final ExecutionException | InterruptedException | 
CancellationException e) {
+        throw AvaticaConnection.HELPER.wrap(e.getMessage(), e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Blocking request with timeout to get the next ready FlightStream in queue.
+   *
+   * @param timeoutValue the amount of time to be waited
+   * @param timeoutUnit  the timeoutValue time unit
+   * @return a FlightStream that is ready to consume or null if all 
FlightStreams are ended.
+   */
+  public FlightStream next(final long timeoutValue, final TimeUnit timeoutUnit)
+      throws SQLException {
+    return next(() -> {
+      try {
+        final Future<FlightStream> future = 
completionService.poll(timeoutValue, timeoutUnit);
+        if (future != null) {
+          return future;
+        }
+      } catch (final InterruptedException e) {
+        throw new SQLTimeoutException("Query was interrupted", e);
+      }
+
+      throw new SQLTimeoutException(
+          String.format("Query timed out after %d %s", timeoutValue, 
timeoutUnit));
+    });
+  }
+
+  /**
+   * Blocking request to get the next ready FlightStream in queue.
+   *
+   * @return a FlightStream that is ready to consume or null if all 
FlightStreams are ended.
+   */
+  public FlightStream next() throws SQLException {
+    return next(() -> {
+      try {
+        return completionService.take();
+      } catch (final InterruptedException e) {
+        throw AvaticaConnection.HELPER.wrap(e.getMessage(), e);
+      }
+    });
+  }
+
+  /**
+   * Checks if this queue is open.
+   */
+  public synchronized void checkOpen() {
+    checkState(!isClosed(), format("%s closed", 
this.getClass().getSimpleName()));
+  }
+
+  /**
+   * Readily adds given {@link FlightStream}s to the queue.
+   */
+  public void enqueue(final Collection<FlightStream> flightStreams) {
+    flightStreams.forEach(this::enqueue);

Review Comment:
   nit: this will acquire/release the lock once per stream



##########
java/flight/flight-jdbc-driver/src/main/java/org/apache/arrow/driver/jdbc/utils/UrlParser.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * URL Parser for extracting key values from a connection string.
+ */
+public class UrlParser {
+  /**
+   * Parse a url key value parameters.
+   *
+   * @param url {@link String}
+   * @return {@link Map}
+   */
+  public static Map<String, String> parse(String url, String separator) {

Review Comment:
   Separator is always `&`, isn't it?



##########
java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/accessor/impl/calendar/ArrowFlightJdbcTimeStampVectorAccessorTest.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc.accessor.impl.calendar;
+
+import static 
org.apache.arrow.driver.jdbc.accessor.impl.calendar.ArrowFlightJdbcTimeStampVectorAccessor.getTimeUnitForVector;
+import static 
org.apache.arrow.driver.jdbc.accessor.impl.calendar.ArrowFlightJdbcTimeStampVectorAccessor.getTimeZoneForVector;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import 
org.apache.arrow.driver.jdbc.accessor.impl.text.ArrowFlightJdbcVarCharVectorAccessor;
+import org.apache.arrow.driver.jdbc.utils.AccessorTestUtils;
+import org.apache.arrow.driver.jdbc.utils.RootAllocatorTestRule;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.util.Text;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ArrowFlightJdbcTimeStampVectorAccessorTest {

Review Comment:
   I'd be interested in seeing if it handles things like daylight savings time 
boundaries correctly



##########
java/flight/flight-jdbc-driver/src/test/java/org/apache/arrow/driver/jdbc/FlightServerTestRule.java:
##########
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.driver.jdbc;
+
+import static 
org.apache.arrow.driver.jdbc.utils.FlightSqlTestCertificates.CertKeyPair;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Properties;
+
+import org.apache.arrow.driver.jdbc.authentication.Authentication;
+import org.apache.arrow.driver.jdbc.authentication.TokenAuthentication;
+import org.apache.arrow.driver.jdbc.authentication.UserPasswordAuthentication;
+import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl;
+import org.apache.arrow.flight.CallHeaders;
+import org.apache.arrow.flight.CallInfo;
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.FlightServer;
+import org.apache.arrow.flight.FlightServerMiddleware;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.RequestContext;
+import org.apache.arrow.flight.sql.FlightSqlProducer;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.util.Preconditions;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.alexpanov.net.FreePortFinder;
+
+/**
+ * Utility class for unit tests that need to instantiate a {@link FlightServer}
+ * and interact with it.
+ */
+public class FlightServerTestRule implements TestRule, AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FlightServerTestRule.class);
+
+  private final Properties properties;
+  private final ArrowFlightConnectionConfigImpl config;
+  private final BufferAllocator allocator;
+  private final FlightSqlProducer producer;
+  private final Authentication authentication;
+  private final CertKeyPair certKeyPair;
+
+  private final MiddlewareCookie.Factory middlewareCookieFactory = new 
MiddlewareCookie.Factory();
+
+  private FlightServerTestRule(final Properties properties,
+                               final ArrowFlightConnectionConfigImpl config,
+                               final BufferAllocator allocator,
+                               final FlightSqlProducer producer,
+                               final Authentication authentication,
+                               final CertKeyPair certKeyPair) {
+    this.properties = Preconditions.checkNotNull(properties);
+    this.config = Preconditions.checkNotNull(config);
+    this.allocator = Preconditions.checkNotNull(allocator);
+    this.producer = Preconditions.checkNotNull(producer);
+    this.authentication = authentication;
+    this.certKeyPair = certKeyPair;
+  }
+
+  /**
+   * Create a {@link FlightServerTestRule} with standard values such as: user, 
password, localhost.
+   *
+   * @param producer the producer used to create the FlightServerTestRule.
+   * @return the FlightServerTestRule.
+   */
+  public static FlightServerTestRule createStandardTestRule(final 
FlightSqlProducer producer) {
+    UserPasswordAuthentication authentication =
+        new UserPasswordAuthentication.Builder()
+            .user("flight-test-user", "flight-test-password")
+            .build();
+
+    return new Builder()
+        .host("localhost")
+        .randomPort()
+        .authentication(authentication)
+        .producer(producer)
+        .build();
+  }
+
+  ArrowFlightJdbcDataSource createDataSource() {
+    return ArrowFlightJdbcDataSource.createNewDataSource(properties);
+  }
+
+  ArrowFlightJdbcDataSource createDataSource(String token) {
+    properties.put("token", token);
+    return ArrowFlightJdbcDataSource.createNewDataSource(properties);
+  }
+
+  public ArrowFlightJdbcConnectionPoolDataSource 
createConnectionPoolDataSource() {
+    return 
ArrowFlightJdbcConnectionPoolDataSource.createNewDataSource(properties);
+  }
+
+  public ArrowFlightJdbcConnectionPoolDataSource 
createConnectionPoolDataSource(boolean useEncryption) {
+    setUseEncryption(useEncryption);
+    return 
ArrowFlightJdbcConnectionPoolDataSource.createNewDataSource(properties);
+  }
+
+  public Connection getConnection(boolean useEncryption, String token) throws 
SQLException {
+    properties.put("token", token);
+
+    return getConnection(useEncryption);
+  }
+
+  public Connection getConnection(boolean useEncryption) throws SQLException {
+    setUseEncryption(useEncryption);
+    return this.createDataSource().getConnection();
+  }
+
+  private void setUseEncryption(boolean useEncryption) {
+    properties.put("useEncryption", useEncryption);
+  }
+
+  public MiddlewareCookie.Factory getMiddlewareCookieFactory() {
+    return middlewareCookieFactory;
+  }
+
+  @FunctionalInterface
+  public interface CheckedFunction<T, R> {
+    R apply(T t) throws IOException;
+  }
+
+  private FlightServer initiateServer(Location location) throws IOException {
+    FlightServer.Builder builder = FlightServer.builder(allocator, location, 
producer)
+        .headerAuthenticator(authentication.authenticate())
+        .middleware(FlightServerMiddleware.Key.of("KEY"), 
middlewareCookieFactory);
+    if (certKeyPair != null) {
+      builder.useTls(certKeyPair.cert, certKeyPair.key);
+    }
+    return builder.build();
+  }
+
+  @Override
+  public Statement apply(Statement base, Description description) {
+    return new Statement() {
+      @Override
+      public void evaluate() throws Throwable {
+        try (FlightServer flightServer =
+                 getStartServer(location ->
+                     initiateServer(location), 3)) {
+          LOGGER.info("Started " + FlightServer.class.getName() + " as " + 
flightServer);
+          base.evaluate();
+        } finally {
+          close();
+        }
+      }
+    };
+  }
+
+  private FlightServer getStartServer(CheckedFunction<Location, FlightServer> 
newServerFromLocation,
+                                      int retries)
+      throws IOException {
+
+    final Deque<ReflectiveOperationException> exceptions = new ArrayDeque<>();
+
+    for (; retries > 0; retries--) {
+      final Location location = Location.forGrpcInsecure(config.getHost(), 
config.getPort());
+      final FlightServer server = newServerFromLocation.apply(location);
+      try {
+        Method start = server.getClass().getMethod("start");
+        start.setAccessible(true);
+        start.invoke(server);
+        return server;
+      } catch (ReflectiveOperationException e) {
+        exceptions.add(e);
+      }
+    }
+
+    exceptions.forEach(
+        e -> LOGGER.error("Failed to start a new " + 
FlightServer.class.getName() + ".", e));
+    throw new IOException(exceptions.pop().getCause());
+  }
+
+  /**
+   * Sets a port to be used.
+   *
+   * @return the port value.
+   */
+  public int getPort() {
+    return config.getPort();
+  }
+
+  /**
+   * Sets a host to be used.
+   *
+   * @return the host value.
+   */
+  public String getHost() {
+    return config.getHost();
+  }
+
+  @Override
+  public void close() throws Exception {
+    allocator.getChildAllocators().forEach(BufferAllocator::close);
+    AutoCloseables.close(allocator);
+  }
+
+  /**
+   * Builder for {@link FlightServerTestRule}.
+   */
+  public static final class Builder {
+    private final Properties properties = new Properties();
+    private FlightSqlProducer producer;
+    private Authentication authentication;
+    private CertKeyPair certKeyPair;
+
+    /**
+     * Sets the host for the server rule.
+     *
+     * @param host the host value.
+     * @return the Builder.
+     */
+    public Builder host(final String host) {
+      
properties.put(ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.HOST.camelName(),
+          host);
+      return this;
+    }
+
+    /**
+     * Sets a random port to be used by the server rule.
+     *
+     * @return the Builder.
+     */
+    public Builder randomPort() {
+      
properties.put(ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PORT.camelName(),
+          FreePortFinder.findFreeLocalPort());

Review Comment:
   FWIW, instead of taking a dependency, just bind to port 0; then you can 
getPort after starting the server



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to