This is an automated email from the ASF dual-hosted git repository.

ravindra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5aaf700  ARROW-4142: [Java] JDBC Array -> Arrow ListVector
5aaf700 is described below

commit 5aaf700e9684d93f8d413972d7718f271cbd8f96
Author: Mike Pigott <[email protected]>
AuthorDate: Tue Feb 19 15:59:47 2019 +0530

    ARROW-4142: [Java] JDBC Array -> Arrow ListVector
    
    https://issues.apache.org/jira/browse/ARROW-4142
    
    This adds support for reading JDBC arrays and converting them to Arrow 
ListVectors.  JDBC does not provide a great way to get the array type; there is 
a ResultSet object for walking the array, but its ResultSetMetaData may not 
contain the right value type (H2, for example, returns a JDBC type of NULL).
    
    This is based on #3134, which includes ARROW-3965 and ARROW-3966.
    
    I found Arrow arrays to be very confusing, and I am not sure if I am using 
them correctly here.  One thing I noticed was if I added a null array to a 
ListVector of VarCharVectors, the next value in the VarCharVector would be 
empty.  I would appreciate any help on why!  The ListVector unit tests weren't 
very helpful.
    
    For all other cases, this code seems to work.  I look forward to your 
review!
    
    Author: Mike Pigott <[email protected]>
    
    Closes #3294 from mikepigott/jdbc-array-field and squashes the following 
commits:
    
    66376dda <Mike Pigott> Support for reading Array records to ListVector from 
JDBC
---
 .../apache/arrow/adapter/jdbc/JdbcFieldInfo.java   | 114 ++++
 .../org/apache/arrow/adapter/jdbc/JdbcToArrow.java |  11 +-
 .../arrow/adapter/jdbc/JdbcToArrowConfig.java      |  77 ++-
 .../adapter/jdbc/JdbcToArrowConfigBuilder.java     |  35 +-
 .../arrow/adapter/jdbc/JdbcToArrowUtils.java       | 573 +++++++++++++--------
 .../arrow/adapter/jdbc/JdbcFieldInfoTest.java      |  44 ++
 .../arrow/adapter/jdbc/JdbcToArrowConfigTest.java  |  47 +-
 .../java/org/apache/arrow/adapter/jdbc/Table.java  |   6 +-
 .../adapter/jdbc/h2/JdbcToArrowArrayTest.java      | 374 ++++++++++++++
 9 files changed, 1043 insertions(+), 238 deletions(-)

diff --git 
a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcFieldInfo.java
 
b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcFieldInfo.java
new file mode 100644
index 0000000..db20bef
--- /dev/null
+++ 
b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcFieldInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class represents the information about a JDBC ResultSet Field that is
+ * needed to construct an {@link org.apache.arrow.vector.types.pojo.ArrowType}.
+ * Currently, this is:
+ * <ul>
+ *   <li>The JDBC {@link java.sql.Types} type.</li>
+ *   <li>The field's precision (used for {@link java.sql.Types#DECIMAL} and 
{@link java.sql.Types#NUMERIC} types)</li>
+ *   <li>The field's scale (used for {@link java.sql.Types#DECIMAL} and {@link 
java.sql.Types#NUMERIC} types)</li>
+ * </ul>
+ */
+public class JdbcFieldInfo {
+  private final int jdbcType;
+  private final int precision;
+  private final int scale;
+
+  /**
+   * Builds a <code>JdbcFieldInfo</code> using only the {@link java.sql.Types} 
type.  Do not use this constructor
+   * if the field type is {@link java.sql.Types#DECIMAL} or {@link 
java.sql.Types#NUMERIC}; the precision and
+   * scale will be set to <code>0</code>.
+   *
+   * @param jdbcType The {@link java.sql.Types} type.
+   * @throws IllegalArgumentException if jdbcType is {@link 
java.sql.Types#DECIMAL} or {@link java.sql.Types#NUMERIC}.
+   */
+  public JdbcFieldInfo(int jdbcType) {
+    Preconditions.checkArgument(
+        (jdbcType != Types.DECIMAL && jdbcType != Types.NUMERIC),
+        "DECIMAL and NUMERIC types require a precision and scale; please use 
another constructor.");
+
+    this.jdbcType = jdbcType;
+    this.precision = 0;
+    this.scale = 0;
+  }
+
+  /**
+   * Builds a <code>JdbcFieldInfo</code> from the {@link java.sql.Types} type, 
precision, and scale.
+   * Use this constructor for {@link java.sql.Types#DECIMAL} and {@link 
java.sql.Types#NUMERIC} types.
+   *
+   * @param jdbcType The {@link java.sql.Types} type.
+   * @param precision The field's numeric precision.
+   * @param scale The field's numeric scale.
+   */
+  public JdbcFieldInfo(int jdbcType, int precision, int scale) {
+    this.jdbcType = jdbcType;
+    this.precision = precision;
+    this.scale = scale;
+  }
+
+  /**
+   * Builds a <code>JdbcFieldInfo</code> from the corresponding {@link 
java.sql.ResultSetMetaData} column.
+   *
+   * @param rsmd The {@link java.sql.ResultSetMetaData} to get the field 
information from.
+   * @param column The column to get the field information for (on a 1-based 
index).
+   * @throws SQLException If the column information cannot be retrieved.
+   * @throws NullPointerException if <code>rsmd</code> is <code>null</code>.
+   * @throws IllegalArgumentException if <code>column</code> is out of bounds.
+   */
+  public JdbcFieldInfo(ResultSetMetaData rsmd, int column) throws SQLException 
{
+    Preconditions.checkNotNull(rsmd, "ResultSetMetaData cannot be null.");
+    Preconditions.checkArgument(column > 0, "ResultSetMetaData columns have 
indices starting at 1.");
+    Preconditions.checkArgument(
+        column <= rsmd.getColumnCount(),
+        "The index must be within the number of columns (1 to %s, inclusive)", 
rsmd.getColumnCount());
+
+    this.jdbcType = rsmd.getColumnType(column);
+    this.precision = rsmd.getPrecision(column);
+    this.scale = rsmd.getScale(column);
+  }
+
+  /**
+   * The {@link java.sql.Types} type.
+   */
+  public int getJdbcType() {
+    return jdbcType;
+  }
+
+  /**
+   * The numeric precision, for {@link java.sql.Types#NUMERIC}  and {@link 
java.sql.Types#DECIMAL} types.
+   */
+  public int getPrecision() {
+    return precision;
+  }
+
+  /**
+   * The numeric scale, for {@link java.sql.Types#NUMERIC}  and {@link 
java.sql.Types#DECIMAL} types.
+   */
+  public int getScale() {
+    return scale;
+  }
+}
diff --git 
a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrow.java
 
b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrow.java
index 7910204..d5be486 100644
--- 
a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrow.java
+++ 
b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrow.java
@@ -88,7 +88,7 @@ public class JdbcToArrow {
     Preconditions.checkNotNull(allocator, "Memory allocator object can not be 
null");
 
     JdbcToArrowConfig config =
-            new JdbcToArrowConfig(allocator, 
JdbcToArrowUtils.getUtcCalendar(), false);
+            new JdbcToArrowConfig(allocator, 
JdbcToArrowUtils.getUtcCalendar());
     return sqlToArrow(connection, query, config);
   }
 
@@ -116,7 +116,7 @@ public class JdbcToArrow {
     Preconditions.checkNotNull(allocator, "Memory allocator object can not be 
null");
     Preconditions.checkNotNull(calendar, "Calendar object can not be null");
 
-    return sqlToArrow(connection, query, new JdbcToArrowConfig(allocator, 
calendar, false));
+    return sqlToArrow(connection, query, new JdbcToArrowConfig(allocator, 
calendar));
   }
 
   /**
@@ -170,7 +170,7 @@ public class JdbcToArrow {
     Preconditions.checkNotNull(allocator, "Memory Allocator object can not be 
null");
 
     JdbcToArrowConfig config = 
-            new JdbcToArrowConfig(allocator, 
JdbcToArrowUtils.getUtcCalendar(), false);
+            new JdbcToArrowConfig(allocator, 
JdbcToArrowUtils.getUtcCalendar());
     return sqlToArrow(resultSet, config);
   }
 
@@ -184,8 +184,7 @@ public class JdbcToArrow {
    */
   public static VectorSchemaRoot sqlToArrow(ResultSet resultSet, Calendar 
calendar) throws SQLException, IOException {
     Preconditions.checkNotNull(resultSet, "JDBC ResultSet object can not be 
null");
-
-    return sqlToArrow(resultSet, new JdbcToArrowConfig(new 
RootAllocator(Integer.MAX_VALUE), calendar, false));
+    return sqlToArrow(resultSet, new JdbcToArrowConfig(new 
RootAllocator(Integer.MAX_VALUE), calendar));
   }
 
   /**
@@ -205,7 +204,7 @@ public class JdbcToArrow {
     Preconditions.checkNotNull(resultSet, "JDBC ResultSet object can not be 
null");
     Preconditions.checkNotNull(allocator, "Memory Allocator object can not be 
null");
 
-    return sqlToArrow(resultSet, new JdbcToArrowConfig(allocator, calendar, 
false));
+    return sqlToArrow(resultSet, new JdbcToArrowConfig(allocator, calendar));
   }
 
   /**
diff --git 
a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfig.java
 
b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfig.java
index 8f2a8ef..c5fccee 100644
--- 
a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfig.java
+++ 
b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfig.java
@@ -18,6 +18,7 @@
 package org.apache.arrow.adapter.jdbc;
 
 import java.util.Calendar;
+import java.util.Map;
 
 import org.apache.arrow.memory.BaseAllocator;
 
@@ -28,16 +29,29 @@ import com.google.common.base.Preconditions;
  * <p>
  * The allocator is used to construct the {@link 
org.apache.arrow.vector.VectorSchemaRoot},
  * and the calendar is used to define the time zone of any {@link 
org.apahe.arrow.vector.pojo.ArrowType.Timestamp}
- * fields that are created during the conversion.
+ * fields that are created during the conversion.  Neither field may be 
<code>null</code>.
  * </p>
  * <p>
- * Neither field may be <code>null</code>.
+ * If the <code>includeMetadata</code> flag is set, the Arrow field metadata 
will contain information
+ * from the corresponding {@link java.sql.ResultSetMetaData} that was used to 
create the
+ * {@link org.apache.arrow.vector.types.pojo.FieldType} of the corresponding
+ * {@link org.apache.arrow.vector.FieldVector}.
+ * </p>
+ * <p>
+ * If there are any {@link java.sql.Types#ARRAY} fields in the {@link 
java.sql.ResultSet}, the corresponding
+ * {@link JdbcFieldInfo} for the array's contents must be defined here.  
Unfortunately, the sub-type
+ * information cannot be retrieved from all JDBC implementations (H2 for 
example, returns
+ * {@link java.sql.Types#NULL} for the array sub-type), so it must be 
configured here.  The column index
+ * or name can be used to map to a {@link JdbcFieldInfo}, and that will be 
used for the conversion.
  * </p>
  */
 public final class JdbcToArrowConfig {
+
   private Calendar calendar;
   private BaseAllocator allocator;
   private boolean includeMetadata;
+  private Map<Integer, JdbcFieldInfo> arraySubTypesByColumnIndex;
+  private Map<String, JdbcFieldInfo> arraySubTypesByColumnName;
 
   /**
    * Constructs a new configuration from the provided allocator and calendar.  
The <code>allocator</code>
@@ -46,20 +60,47 @@ public final class JdbcToArrowConfig {
    *
    * @param allocator       The memory allocator to construct the Arrow 
vectors with.
    * @param calendar        The calendar to use when constructing Timestamp 
fields and reading time-based results.
-   * @param includeMetadata Whether to include JDBC field metadata in the 
Arrow Schema Field metadata.
    */
-  JdbcToArrowConfig(BaseAllocator allocator, Calendar calendar, boolean 
includeMetadata) {
+  JdbcToArrowConfig(BaseAllocator allocator, Calendar calendar) {
     Preconditions.checkNotNull(allocator, "Memory allocator cannot be null");
 
     this.allocator = allocator;
     this.calendar = calendar;
+    this.includeMetadata = false;
+    this.arraySubTypesByColumnIndex = null;
+    this.arraySubTypesByColumnName = null;
+  }
+
+  /**
+   * Constructs a new configuration from the provided allocator and calendar.  
The <code>allocator</code>
+   * is used when constructing the Arrow vectors from the ResultSet, and the 
calendar is used to define
+   * Arrow Timestamp fields, and to read time-based fields from the JDBC 
<code>ResultSet</code>. 
+   *
+   * @param allocator       The memory allocator to construct the Arrow 
vectors with.
+   * @param calendar        The calendar to use when constructing Timestamp 
fields and reading time-based results.
+   * @param includeMetadata Whether to include JDBC field metadata in the 
Arrow Schema Field metadata.
+   * @param arraySubTypesByColumnIndex The type of the JDBC array at the 
column index (1-based).
+   * @param arraySubTypesByColumnName  The type of the JDBC array at the 
column name.
+   */
+  JdbcToArrowConfig(
+      BaseAllocator allocator,
+      Calendar calendar,
+      boolean includeMetadata,
+      Map<Integer, JdbcFieldInfo> arraySubTypesByColumnIndex,
+      Map<String, JdbcFieldInfo> arraySubTypesByColumnName) {
+
+    this(allocator, calendar);
+
     this.includeMetadata = includeMetadata;
+    this.arraySubTypesByColumnIndex = arraySubTypesByColumnIndex;
+    this.arraySubTypesByColumnName = arraySubTypesByColumnName;
   }
 
   /**
    * The calendar to use when defining Arrow Timestamp fields
    * and retrieving {@link Date}, {@link Time}, or {@link Timestamp}
    * data types from the {@link ResultSet}, or <code>null</code> if not 
converting.
+   *
    * @return the calendar.
    */
   public Calendar getCalendar() {
@@ -82,4 +123,32 @@ public final class JdbcToArrowConfig {
   public boolean shouldIncludeMetadata() {
     return includeMetadata;
   }
+
+  /**
+   * Returns the array sub-type {@link JdbcFieldInfo} defined for the provided 
column index.
+   *
+   * @param index The {@link java.sql.ResultSetMetaData} column index of an 
{@link java.sql.Types#ARRAY} type.
+   * @return The {@link JdbcFieldInfo} for that array's sub-type, or 
<code>null</code> if not defined.
+   */
+  public JdbcFieldInfo getArraySubTypeByColumnIndex(int index) {
+    if (arraySubTypesByColumnIndex == null) {
+      return null;
+    } else {
+      return arraySubTypesByColumnIndex.get(index);
+    }
+  }
+
+  /**
+   * Returns the array sub-type {@link JdbcFieldInfo} defined for the provided 
column name.
+   *
+   * @param index The {@link java.sql.ResultSetMetaData} column name of an 
{@link java.sql.Types#ARRAY} type.
+   * @return The {@link JdbcFieldInfo} for that array's sub-type, or 
<code>null</code> if not defined.
+   */
+  public JdbcFieldInfo getArraySubTypeByColumnName(String name) {
+    if (arraySubTypesByColumnName == null) {
+      return null;
+    } else {
+      return arraySubTypesByColumnName.get(name);
+    }
+  }
 }
diff --git 
a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.java
 
b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.java
index 51327aa..ea351d8 100644
--- 
a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.java
+++ 
b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.arrow.adapter.jdbc;
 
 import java.util.Calendar;
+import java.util.Map;
 
 import org.apache.arrow.memory.BaseAllocator;
 
@@ -30,6 +31,8 @@ public class JdbcToArrowConfigBuilder {
   private Calendar calendar;
   private BaseAllocator allocator;
   private boolean includeMetadata;
+  private Map<Integer, JdbcFieldInfo> arraySubTypesByColumnIndex;
+  private Map<String, JdbcFieldInfo> arraySubTypesByColumnName;
 
   /**
    * Default constructor for the <code>JdbcToArrowConfigBuilder}</code>.
@@ -40,6 +43,8 @@ public class JdbcToArrowConfigBuilder {
     this.allocator = null;
     this.calendar = null;
     this.includeMetadata = false;
+    this.arraySubTypesByColumnIndex = null;
+    this.arraySubTypesByColumnName = null;
   }
 
   /**
@@ -127,6 +132,29 @@ public class JdbcToArrowConfigBuilder {
   }
 
   /**
+   * Sets the mapping of column-index-to-{@link JdbcFieldInfo} used for 
columns of type {@link java.sql.Types#ARRAY}.
+   * The column index is 1-based, to match the JDBC column index.
+   *
+   * @param map The mapping.
+   * @return This instance of the <code>JdbcToArrowConfig</code>, for chaining.
+   */
+  public JdbcToArrowConfigBuilder setArraySubTypeByColumnIndexMap(Map<Integer, 
JdbcFieldInfo> map) {
+    this.arraySubTypesByColumnIndex = map;
+    return this;
+  }
+
+  /**
+   * Sets the mapping of column-name-to-{@link JdbcFieldInfo} used for columns 
of type {@link java.sql.Types#ARRAY}.
+   *
+   * @param map The mapping.
+   * @return This instance of the <code>JdbcToArrowConfig</code>, for chaining.
+   */
+  public JdbcToArrowConfigBuilder setArraySubTypeByColumnNameMap(Map<String, 
JdbcFieldInfo> map) {
+    this.arraySubTypesByColumnName = map;
+    return this;
+  }
+
+  /**
    * This builds the {@link JdbcToArrowConfig} from the provided
    * {@link BaseAllocator} and {@link Calendar}.
    *
@@ -134,6 +162,11 @@ public class JdbcToArrowConfigBuilder {
    * @throws NullPointerException if either the allocator or calendar was not 
set.
    */
   public JdbcToArrowConfig build() {
-    return new JdbcToArrowConfig(allocator, calendar, includeMetadata);
+    return new JdbcToArrowConfig(
+        allocator,
+        calendar,
+        includeMetadata,
+        arraySubTypesByColumnIndex,
+        arraySubTypesByColumnName);
   }
 }
diff --git 
a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
 
b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
index 833ca84..f54363f 100644
--- 
a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
+++ 
b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
+import java.sql.Array;
 import java.sql.Blob;
 import java.sql.Clob;
 import java.sql.Date;
@@ -59,6 +60,7 @@ import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.holders.NullableBigIntHolder;
 import org.apache.arrow.vector.holders.NullableBitHolder;
 import org.apache.arrow.vector.holders.NullableDateMilliHolder;
@@ -95,6 +97,15 @@ public class JdbcToArrowUtils {
   private static final int DEFAULT_STREAM_BUFFER_SIZE = 1024;
   private static final int DEFAULT_CLOB_SUBSTRING_READ_SIZE = 256;
 
+  private static final int JDBC_ARRAY_VALUE_COLUMN = 2;
+
+  /**
+   * Returns the instance of a {java.util.Calendar} with the UTC time zone and 
root locale.
+   */
+  public static Calendar getUtcCalendar() {
+    return Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT);
+  }
+
   /**
    * Create Arrow {@link Schema} object for the given JDBC {@link 
ResultSetMetaData}.
    *
@@ -107,67 +118,46 @@ public class JdbcToArrowUtils {
     Preconditions.checkNotNull(rsmd, "JDBC ResultSetMetaData object can't be 
null");
     Preconditions.checkNotNull(calendar, "Calendar object can't be null");
 
-    return jdbcToArrowSchema(rsmd, new JdbcToArrowConfig(new RootAllocator(0), 
calendar, false));
-  }
-
-  /**
-   * Returns the instance of a {java.util.Calendar} with the UTC time zone and 
root locale.
-   */
-  public static Calendar getUtcCalendar() {
-    return Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT);
+    return jdbcToArrowSchema(rsmd, new JdbcToArrowConfig(new RootAllocator(0), 
calendar));
   }
 
   /**
-   * Create Arrow {@link Schema} object for the given JDBC {@link 
ResultSetMetaData}.
-   *
-   * <p>This method currently performs following type mapping for JDBC SQL 
data types to corresponding Arrow data types.
-   *
-   * <p>CHAR --> ArrowType.Utf8
-   * NCHAR --> ArrowType.Utf8
-   * VARCHAR --> ArrowType.Utf8
-   * NVARCHAR --> ArrowType.Utf8
-   * LONGVARCHAR --> ArrowType.Utf8
-   * LONGNVARCHAR --> ArrowType.Utf8
-   * NUMERIC --> ArrowType.Decimal(precision, scale)
-   * DECIMAL --> ArrowType.Decimal(precision, scale)
-   * BIT --> ArrowType.Bool
-   * TINYINT --> ArrowType.Int(8, signed)
-   * SMALLINT --> ArrowType.Int(16, signed)
-   * INTEGER --> ArrowType.Int(32, signed)
-   * BIGINT --> ArrowType.Int(64, signed)
-   * REAL --> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
-   * FLOAT --> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
-   * DOUBLE --> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
-   * BINARY --> ArrowType.Binary
-   * VARBINARY --> ArrowType.Binary
-   * LONGVARBINARY --> ArrowType.Binary
-   * DATE --> ArrowType.Date(DateUnit.MILLISECOND)
-   * TIME --> ArrowType.Time(TimeUnit.MILLISECOND, 32)
-   * TIMESTAMP --> ArrowType.Timestamp(TimeUnit.MILLISECOND, timezone=null)
-   * CLOB --> ArrowType.Utf8
-   * BLOB --> ArrowType.Binary
+   * Create Arrow {@link Schema} object for the given JDBC {@link 
java.sql.ResultSetMetaData}.
+   * <p>
+   * The {@link JdbcToArrowUtils#getArrowTypeForJdbcField(JdbcFieldInfo, 
Calendar)} method is used to construct a
+   * {@link org.apache.arrow.vector.types.pojo.ArrowType} for each field in 
the {@link java.sql.ResultSetMetaData}.
+   * </p>
+   * <p>
+   * If {@link JdbcToArrowConfig#getIncludeMetadata()} returns 
<code>true</code>, the following fields
+   * will be added to the {@link FieldType#getMetadata()}:
+   * <ul>
+   *  <li>{@link Constants#SQL_CATALOG_NAME_KEY} representing {@link 
ResultSetMetaData#getCatalogName(int)}</li>
+   *  <li>{@link Constants#SQL_TABLE_NAME_KEY} representing {@link 
ResultSetMetaData#getTableName(int)}</li>
+   *  <li>{@link Constants#SQL_COLUMN_NAME_KEY} representing {@link 
ResultSetMetaData#getColumnName(int)}</li>
+   *  <li>{@link Constants#SQL_TYPE_KEY} representing {@link 
ResultSetMetaData#getColumnTypeName(int)}</li>
+   * </ul>
+   * </p>
+   * <p>
+   * If any columns are of type {@link java.sql.Types#ARRAY}, the 
configuration object will be used to look up
+   * the array sub-type field.  The {@link 
JdbcToArrowConfig#getArraySubTypeByColumnIndex(int)} method will be
+   * checked first, followed by the {@link 
JdbcToArrowConfig#getArraySubTypeByColumnName(String)} method.
+   * </p>
    *
    * @param rsmd The ResultSetMetaData containing the results, to read the 
JDBC metadata from.
    * @param config The configuration to use when constructing the schema.
    * @return {@link Schema}
    * @throws SQLException on error
+   * @throws IllegalArgumentException if <code>rsmd</code> contains an {@link 
java.sql.Types#ARRAY} but the
+   *                                  <code>config</code> does not have a 
sub-type definition for it.
    */
   public static Schema jdbcToArrowSchema(ResultSetMetaData rsmd, 
JdbcToArrowConfig config) throws SQLException {
     Preconditions.checkNotNull(rsmd, "JDBC ResultSetMetaData object can't be 
null");
     Preconditions.checkNotNull(config, "The configuration object must not be 
null");
 
-    final String timezone;
-    if (config.getCalendar() != null) {
-      timezone = config.getCalendar().getTimeZone().getID();
-    } else {
-      timezone = null;
-    }
-
     List<Field> fields = new ArrayList<>();
     int columnCount = rsmd.getColumnCount();
     for (int i = 1; i <= columnCount; i++) {
       final String columnName = rsmd.getColumnName(i);
-      final FieldType fieldType;
 
       final Map<String, String> metadata;
       if (config.shouldIncludeMetadata()) {
@@ -181,83 +171,172 @@ public class JdbcToArrowUtils {
         metadata = null;
       }
 
-      switch (rsmd.getColumnType(i)) {
-        case Types.BOOLEAN:
-        case Types.BIT:
-          fieldType = new FieldType(true, new ArrowType.Bool(), null, 
metadata);
-          break;
-        case Types.TINYINT:
-          fieldType = new FieldType(true, new ArrowType.Int(8, true), null, 
metadata);
-          break;
-        case Types.SMALLINT:
-          fieldType = new FieldType(true, new ArrowType.Int(16, true), null, 
metadata);
-          break;
-        case Types.INTEGER:
-          fieldType = new FieldType(true, new ArrowType.Int(32, true), null, 
metadata);
-          break;
-        case Types.BIGINT:
-          fieldType = new FieldType(true, new ArrowType.Int(64, true), null, 
metadata);
-          break;
-        case Types.NUMERIC:
-        case Types.DECIMAL:
-          int precision = rsmd.getPrecision(i);
-          int scale = rsmd.getScale(i);
-          fieldType = new FieldType(true, new ArrowType.Decimal(precision, 
scale), null, metadata);
-          break;
-        case Types.REAL:
-        case Types.FLOAT:
-          fieldType = new FieldType(true, new ArrowType.FloatingPoint(SINGLE), 
null, metadata);
-          break;
-        case Types.DOUBLE:
-          fieldType = new FieldType(true, new ArrowType.FloatingPoint(DOUBLE), 
null, metadata);
-          break;
-        case Types.CHAR:
-        case Types.NCHAR:
-        case Types.VARCHAR:
-        case Types.NVARCHAR:
-        case Types.LONGVARCHAR:
-        case Types.LONGNVARCHAR:
-        case Types.CLOB:
-          fieldType = new FieldType(true, new ArrowType.Utf8(), null, 
metadata);
-          break;
-        case Types.DATE:
-          fieldType = new FieldType(true, new 
ArrowType.Date(DateUnit.MILLISECOND), null, metadata);
-          break;
-        case Types.TIME:
-          fieldType = new FieldType(true, new 
ArrowType.Time(TimeUnit.MILLISECOND, 32), null, metadata);
-          break;
-        case Types.TIMESTAMP:
-          fieldType =
-              new FieldType(
-                  true,
-                  new ArrowType.Timestamp(TimeUnit.MILLISECOND, timezone),
-                  null,
-                  metadata);
-          break;
-        case Types.BINARY:
-        case Types.VARBINARY:
-        case Types.LONGVARBINARY:
-        case Types.BLOB:
-          fieldType = new FieldType(true, new ArrowType.Binary(), null, 
metadata);
-          break;
-
-        case Types.ARRAY:
-            // TODO Need to handle this type
-            // fields.add(new Field("list", FieldType.nullable(new 
ArrowType.List()), null));
-        default:
-          // no-op, shouldn't get here
-          fieldType = null;
-          break;
-      }
+      final ArrowType arrowType = getArrowTypeForJdbcField(new 
JdbcFieldInfo(rsmd, i), config.getCalendar());
+      if (arrowType != null) {
+        final FieldType fieldType = new FieldType(true, arrowType, /* 
dictionary encoding */ null, metadata);
+
+        List<Field> children = null;
+        if (arrowType.getTypeID() == ArrowType.List.TYPE_TYPE) {
+          final JdbcFieldInfo arrayFieldInfo = 
getJdbcFieldInfoForArraySubType(rsmd, i, config);
+          if (arrayFieldInfo == null) {
+            throw new IllegalArgumentException("Configuration does not provide 
a mapping for array column " + i);
+          }
+          children = new ArrayList<Field>();
+          final ArrowType childType =
+              getArrowTypeForJdbcField(arrayFieldInfo, config.getCalendar());
+          children.add(new Field("child", FieldType.nullable(childType), 
null));
+        }
 
-      if (fieldType != null) {
-        fields.add(new Field(columnName, fieldType, null));
+        fields.add(new Field(columnName, fieldType, children));
       }
     }
 
     return new Schema(fields, null);
   }
 
+  /**
+   * Creates an {@link org.apache.arrow.vector.types.pojo.ArrowType}
+   * from the {@link JdbcFieldInfo} and {@link java.util.Calendar}.
+   *
+   * <p>This method currently performs following type mapping for JDBC SQL 
data types to corresponding Arrow data types.
+   *
+   * <ul>
+   *   <li>CHAR --> ArrowType.Utf8</li>
+   *   <li>NCHAR --> ArrowType.Utf8</li>
+   *   <li>VARCHAR --> ArrowType.Utf8</li>
+   *   <li>NVARCHAR --> ArrowType.Utf8</li>
+   *   <li>LONGVARCHAR --> ArrowType.Utf8</li>
+   *   <li>LONGNVARCHAR --> ArrowType.Utf8</li>
+   *   <li>NUMERIC --> ArrowType.Decimal(precision, scale)</li>
+   *   <li>DECIMAL --> ArrowType.Decimal(precision, scale)</li>
+   *   <li>BIT --> ArrowType.Bool</li>
+   *   <li>TINYINT --> ArrowType.Int(8, signed)</li>
+   *   <li>SMALLINT --> ArrowType.Int(16, signed)</li>
+   *   <li>INTEGER --> ArrowType.Int(32, signed)</li>
+   *   <li>BIGINT --> ArrowType.Int(64, signed)</li>
+   *   <li>REAL --> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)</li>
+   *   <li>FLOAT --> 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)</li>
+   *   <li>DOUBLE --> 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)</li>
+   *   <li>BINARY --> ArrowType.Binary</li>
+   *   <li>VARBINARY --> ArrowType.Binary</li>
+   *   <li>LONGVARBINARY --> ArrowType.Binary</li>
+   *   <li>DATE --> ArrowType.Date(DateUnit.MILLISECOND)</li>
+   *   <li>TIME --> ArrowType.Time(TimeUnit.MILLISECOND, 32)</li>
+   *   <li>TIMESTAMP --> ArrowType.Timestamp(TimeUnit.MILLISECOND, calendar 
timezone)</li>
+   *   <li>CLOB --> ArrowType.Utf8</li>
+   *   <li>BLOB --> ArrowType.Binary</li>
+   * </ul>
+   *
+   * @param fieldInfo The field information to construct the 
<code>ArrowType</code> from.
+   * @param calendar The calendar to use when constructing the 
<code>ArrowType.Timestamp</code>
+   *                 for {@link java.sql.Types#TIMESTAMP} types.
+   * @return The corresponding <code>ArrowType</code>.
+   * @throws NullPointerException if either <code>fieldInfo</code> or 
<code>calendar</code> are <code>null</code>.
+   */
+  public static ArrowType getArrowTypeForJdbcField(JdbcFieldInfo fieldInfo, 
Calendar calendar) {
+    Preconditions.checkNotNull(fieldInfo, "JdbcFieldInfo object cannot be 
null");
+
+    final String timezone;
+    if (calendar != null) {
+      timezone = calendar.getTimeZone().getID();
+    } else {
+      timezone = null;
+    }
+
+
+    final ArrowType arrowType;
+
+    switch (fieldInfo.getJdbcType()) {
+      case Types.BOOLEAN:
+      case Types.BIT:
+        arrowType = new ArrowType.Bool();
+        break;
+      case Types.TINYINT:
+        arrowType = new ArrowType.Int(8, true);
+        break;
+      case Types.SMALLINT:
+        arrowType = new ArrowType.Int(16, true);
+        break;
+      case Types.INTEGER:
+        arrowType = new ArrowType.Int(32, true);
+        break;
+      case Types.BIGINT:
+        arrowType = new ArrowType.Int(64, true);
+        break;
+      case Types.NUMERIC:
+      case Types.DECIMAL:
+        int precision = fieldInfo.getPrecision();
+        int scale = fieldInfo.getScale();
+        arrowType = new ArrowType.Decimal(precision, scale);
+        break;
+      case Types.REAL:
+      case Types.FLOAT:
+        arrowType = new ArrowType.FloatingPoint(SINGLE);
+        break;
+      case Types.DOUBLE:
+        arrowType = new ArrowType.FloatingPoint(DOUBLE);
+        break;
+      case Types.CHAR:
+      case Types.NCHAR:
+      case Types.VARCHAR:
+      case Types.NVARCHAR:
+      case Types.LONGVARCHAR:
+      case Types.LONGNVARCHAR:
+      case Types.CLOB:
+        arrowType = new ArrowType.Utf8();
+        break;
+      case Types.DATE:
+        arrowType = new ArrowType.Date(DateUnit.MILLISECOND);
+        break;
+      case Types.TIME:
+        arrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+        break;
+      case Types.TIMESTAMP:
+        arrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, timezone);
+        break;
+      case Types.BINARY:
+      case Types.VARBINARY:
+      case Types.LONGVARBINARY:
+      case Types.BLOB:
+        arrowType = new ArrowType.Binary();
+        break;
+      case Types.ARRAY:
+        arrowType = new ArrowType.List();
+        break;
+      default:
+        // no-op, shouldn't get here
+        arrowType = null;
+        break;
+    }
+
+    return arrowType;
+  }
+
+  /* Uses the configuration to determine what the array sub-type JdbcFieldInfo 
is.
+   * If no sub-type can be found, returns null.
+   */
+  private static JdbcFieldInfo getJdbcFieldInfoForArraySubType(
+      ResultSetMetaData rsmd,
+      int arrayColumn,
+      JdbcToArrowConfig config)
+          throws SQLException {
+
+    Preconditions.checkNotNull(rsmd, "ResultSet MetaData object cannot be 
null");
+    Preconditions.checkNotNull(config, "Configuration must not be null");
+    Preconditions.checkArgument(
+        arrayColumn > 0,
+        "ResultSetMetaData columns start with 1; column cannot be less than 
1");
+    Preconditions.checkArgument(
+        arrayColumn <= rsmd.getColumnCount(),
+        "Column number cannot be more than the number of columns");
+
+    JdbcFieldInfo fieldInfo = config.getArraySubTypeByColumnIndex(arrayColumn);
+    if (fieldInfo == null) {
+      fieldInfo = 
config.getArraySubTypeByColumnName(rsmd.getColumnName(arrayColumn));
+    }
+    return fieldInfo;
+  }
+
   private static void allocateVectors(VectorSchemaRoot root, int size) {
     List<FieldVector> vectors = root.getFieldVectors();
     for (FieldVector fieldVector : vectors) {
@@ -284,9 +363,10 @@ public class JdbcToArrowUtils {
       throws SQLException, IOException {
 
     Preconditions.checkNotNull(rs, "JDBC ResultSet object can't be null");
-    Preconditions.checkNotNull(root, "JDBC ResultSet object can't be null");
+    Preconditions.checkNotNull(root, "Vector Schema cannot be null");
+    Preconditions.checkNotNull(calendar, "Calendar object can't be null");
 
-    jdbcToArrowVectors(rs, root, new JdbcToArrowConfig(new RootAllocator(0), 
calendar, false));
+    jdbcToArrowVectors(rs, root, new JdbcToArrowConfig(new RootAllocator(0), 
calendar));
   }
 
   /**
@@ -310,117 +390,133 @@ public class JdbcToArrowUtils {
 
     allocateVectors(root, DEFAULT_BUFFER_SIZE);
 
-    final Calendar calendar = config.getCalendar();
-
     int rowCount = 0;
     while (rs.next()) {
       for (int i = 1; i <= columnCount; i++) {
-        String columnName = rsmd.getColumnName(i);
-        switch (rsmd.getColumnType(i)) {
-          case Types.BOOLEAN:
-          case Types.BIT:
-            updateVector((BitVector) root.getVector(columnName),
-                    rs.getBoolean(i), !rs.wasNull(), rowCount);
-            break;
-          case Types.TINYINT:
-            updateVector((TinyIntVector) root.getVector(columnName),
-                    rs.getInt(i), !rs.wasNull(), rowCount);
-            break;
-          case Types.SMALLINT:
-            updateVector((SmallIntVector) root.getVector(columnName),
-                    rs.getInt(i), !rs.wasNull(), rowCount);
-            break;
-          case Types.INTEGER:
-            updateVector((IntVector) root.getVector(columnName),
-                    rs.getInt(i), !rs.wasNull(), rowCount);
-            break;
-          case Types.BIGINT:
-            updateVector((BigIntVector) root.getVector(columnName),
-                    rs.getLong(i), !rs.wasNull(), rowCount);
-            break;
-          case Types.NUMERIC:
-          case Types.DECIMAL:
-            updateVector((DecimalVector) root.getVector(columnName),
-                    rs.getBigDecimal(i), !rs.wasNull(), rowCount);
-            break;
-          case Types.REAL:
-          case Types.FLOAT:
-            updateVector((Float4Vector) root.getVector(columnName),
-                    rs.getFloat(i), !rs.wasNull(), rowCount);
-            break;
-          case Types.DOUBLE:
-            updateVector((Float8Vector) root.getVector(columnName),
-                    rs.getDouble(i), !rs.wasNull(), rowCount);
-            break;
-          case Types.CHAR:
-          case Types.NCHAR:
-          case Types.VARCHAR:
-          case Types.NVARCHAR:
-          case Types.LONGVARCHAR:
-          case Types.LONGNVARCHAR:
-            updateVector((VarCharVector) root.getVector(columnName),
-                    rs.getString(i), !rs.wasNull(), rowCount);
-            break;
-          case Types.DATE:
-            final Date date;
-            if (calendar != null) {
-              date = rs.getDate(i, calendar);
-            } else {
-              date = rs.getDate(i);
-            }
-
-            updateVector((DateMilliVector) root.getVector(columnName), date, 
!rs.wasNull(), rowCount);
-            break;
-          case Types.TIME:
-            final Time time;
-            if (calendar != null) {
-              time = rs.getTime(i, calendar);
-            } else {
-              time = rs.getTime(i);
-            }
-
-            updateVector((TimeMilliVector) root.getVector(columnName), time, 
!rs.wasNull(), rowCount);
-            break;
-          case Types.TIMESTAMP:
-            final Timestamp ts;
-            if (calendar != null) {
-              ts = rs.getTimestamp(i, calendar);
-            } else {
-              ts = rs.getTimestamp(i);
-            }
-
-            // TODO: Need to handle precision such as milli, micro, nano
-            updateVector((TimeStampVector) root.getVector(columnName), ts, 
!rs.wasNull(), rowCount);
-            break;
-          case Types.BINARY:
-          case Types.VARBINARY:
-          case Types.LONGVARBINARY:
-            updateVector((VarBinaryVector) root.getVector(columnName),
-                    rs.getBinaryStream(i), !rs.wasNull(), rowCount);
-            break;
-          case Types.ARRAY:
-            // TODO Need to handle this type
-            // fields.add(new Field("list", FieldType.nullable(new 
ArrowType.List()), null));
-            break;
-          case Types.CLOB:
-            updateVector((VarCharVector) root.getVector(columnName),
-                    rs.getClob(i), !rs.wasNull(), rowCount);
-            break;
-          case Types.BLOB:
-            updateVector((VarBinaryVector) root.getVector(columnName),
-                    rs.getBlob(i), !rs.wasNull(), rowCount);
-            break;
-
-          default:
-            // no-op, shouldn't get here
-            break;
-        }
+        jdbcToFieldVector(
+            rs,
+            i,
+            rs.getMetaData().getColumnType(i),
+            rowCount,
+            root.getVector(rsmd.getColumnName(i)),
+            config);
       }
       rowCount++;
     }
     root.setRowCount(rowCount);
   }
 
+  private static void jdbcToFieldVector(
+      ResultSet rs,
+      int columnIndex,
+      int jdbcColType,
+      int rowCount,
+      FieldVector vector,
+      JdbcToArrowConfig config)
+          throws SQLException, IOException {
+
+    final Calendar calendar = config.getCalendar();
+
+    switch (jdbcColType) {
+      case Types.BOOLEAN:
+      case Types.BIT:
+        updateVector((BitVector) vector,
+                rs.getBoolean(columnIndex), !rs.wasNull(), rowCount);
+        break;
+      case Types.TINYINT:
+        updateVector((TinyIntVector) vector,
+                rs.getInt(columnIndex), !rs.wasNull(), rowCount);
+        break;
+      case Types.SMALLINT:
+        updateVector((SmallIntVector) vector,
+                rs.getInt(columnIndex), !rs.wasNull(), rowCount);
+        break;
+      case Types.INTEGER:
+        updateVector((IntVector) vector,
+                rs.getInt(columnIndex), !rs.wasNull(), rowCount);
+        break;
+      case Types.BIGINT:
+        updateVector((BigIntVector) vector,
+                rs.getLong(columnIndex), !rs.wasNull(), rowCount);
+        break;
+      case Types.NUMERIC:
+      case Types.DECIMAL:
+        updateVector((DecimalVector) vector,
+                rs.getBigDecimal(columnIndex), !rs.wasNull(), rowCount);
+        break;
+      case Types.REAL:
+      case Types.FLOAT:
+        updateVector((Float4Vector) vector,
+                rs.getFloat(columnIndex), !rs.wasNull(), rowCount);
+        break;
+      case Types.DOUBLE:
+        updateVector((Float8Vector) vector,
+                rs.getDouble(columnIndex), !rs.wasNull(), rowCount);
+        break;
+      case Types.CHAR:
+      case Types.NCHAR:
+      case Types.VARCHAR:
+      case Types.NVARCHAR:
+      case Types.LONGVARCHAR:
+      case Types.LONGNVARCHAR:
+        updateVector((VarCharVector) vector,
+                rs.getString(columnIndex), !rs.wasNull(), rowCount);
+        break;
+      case Types.DATE:
+        final Date date;
+        if (calendar != null) {
+          date = rs.getDate(columnIndex, calendar);
+        } else {
+          date = rs.getDate(columnIndex);
+        }
+
+        updateVector((DateMilliVector) vector, date, !rs.wasNull(), rowCount);
+        break;
+      case Types.TIME:
+        final Time time;
+        if (calendar != null) {
+          time = rs.getTime(columnIndex, calendar);
+        } else {
+          time = rs.getTime(columnIndex);
+        }
+
+        updateVector((TimeMilliVector) vector, time, !rs.wasNull(), rowCount);
+        break;
+      case Types.TIMESTAMP:
+        final Timestamp ts;
+        if (calendar != null) {
+          ts = rs.getTimestamp(columnIndex, calendar);
+        } else {
+          ts = rs.getTimestamp(columnIndex);
+        }
+
+        // TODO: Need to handle precision such as milli, micro, nano
+        updateVector((TimeStampVector) vector, ts, !rs.wasNull(), rowCount);
+        break;
+      case Types.BINARY:
+      case Types.VARBINARY:
+      case Types.LONGVARBINARY:
+        updateVector((VarBinaryVector) vector,
+                rs.getBinaryStream(columnIndex), !rs.wasNull(), rowCount);
+        break;
+      case Types.ARRAY:
+        updateVector((ListVector) vector, rs, columnIndex, rowCount, config);
+        break;
+      case Types.CLOB:
+        updateVector((VarCharVector) vector,
+                rs.getClob(columnIndex), !rs.wasNull(), rowCount);
+        break;
+      case Types.BLOB:
+        updateVector((VarBinaryVector) vector,
+                rs.getBlob(columnIndex), !rs.wasNull(), rowCount);
+        break;
+
+      default:
+        // no-op, shouldn't get here
+        break;
+    }
+  }
+
   private static void updateVector(BitVector bitVector, boolean value, boolean 
isNonNull, int rowCount) {
     NullableBitHolder holder = new NullableBitHolder();
     holder.isSet = isNonNull ? 1 : 0;
@@ -620,4 +716,45 @@ public class JdbcToArrowUtils {
     updateVector(varBinaryVector, blob != null ? blob.getBinaryStream() : 
null, isNonNull, rowCount);
   }
 
+  private static void updateVector(
+      ListVector listVector,
+      ResultSet resultSet,
+      int arrayIndex,
+      int rowCount,
+      JdbcToArrowConfig config)
+      throws SQLException, IOException {
+
+    final JdbcFieldInfo fieldInfo = 
getJdbcFieldInfoForArraySubType(resultSet.getMetaData(), arrayIndex, config);
+    if (fieldInfo == null) {
+      throw new IllegalArgumentException("Column " + arrayIndex + " is an 
array of unknown type.");
+    }
+
+    final int valueCount = listVector.getValueCount();
+    final Array array = resultSet.getArray(arrayIndex);
+
+    FieldVector fieldVector = listVector.getDataVector();
+    int arrayRowCount = 0;
+
+    if (!resultSet.wasNull()) {
+      listVector.startNewValue(rowCount);
+
+      try (ResultSet rs = array.getResultSet()) {
+
+        while (rs.next()) {
+          jdbcToFieldVector(
+              rs,
+              JDBC_ARRAY_VALUE_COLUMN,
+              fieldInfo.getJdbcType(),
+              valueCount + arrayRowCount,
+              fieldVector,
+              config);
+          arrayRowCount++;
+        }
+      }
+
+      listVector.endValue(rowCount, arrayRowCount);
+    }
+
+    listVector.setValueCount(valueCount + arrayRowCount);
+  }
 }
diff --git 
a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcFieldInfoTest.java
 
b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcFieldInfoTest.java
new file mode 100644
index 0000000..3d6074b
--- /dev/null
+++ 
b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcFieldInfoTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc;
+
+import static org.junit.Assert.*;
+
+import java.sql.Types;
+
+import org.junit.Test;
+
+public class JdbcFieldInfoTest {
+
+  @Test
+  public void testCreateJdbcFieldInfoWithJdbcType() {
+    JdbcFieldInfo fieldInfo = new JdbcFieldInfo(Types.BLOB);
+
+    assertEquals(Types.BLOB, fieldInfo.getJdbcType());
+    assertEquals(0, fieldInfo.getPrecision());
+    assertEquals(0, fieldInfo.getScale());
+  }
+
+  public void testCreateJdbcFieldInfoWithJdbcTypePrecisionAndScale() {
+    JdbcFieldInfo fieldInfo = new JdbcFieldInfo(Types.BLOB, 1, 2);
+
+    assertEquals(Types.BLOB, fieldInfo.getJdbcType());
+    assertEquals(1, fieldInfo.getPrecision());
+    assertEquals(2, fieldInfo.getScale());
+  }
+}
diff --git 
a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigTest.java
 
b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigTest.java
index bafb2dc..46d8b04 100644
--- 
a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigTest.java
+++ 
b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigTest.java
@@ -19,7 +19,9 @@ package org.apache.arrow.adapter.jdbc;
 
 import static org.junit.Assert.*;
 
+import java.sql.Types;
 import java.util.Calendar;
+import java.util.HashMap;
 import java.util.Locale;
 import java.util.TimeZone;
 
@@ -34,7 +36,7 @@ public class JdbcToArrowConfigTest {
 
   @Test(expected = NullPointerException.class)
   public void testConfigNullArguments() {
-    new JdbcToArrowConfig(null, null, false);
+    new JdbcToArrowConfig(null, null);
   }
 
   @Test(expected = NullPointerException.class)
@@ -43,7 +45,7 @@ public class JdbcToArrowConfigTest {
   }
 
   public void testConfigNullCalendar() {
-    JdbcToArrowConfig config = new JdbcToArrowConfig(allocator, null, false);
+    JdbcToArrowConfig config = new JdbcToArrowConfig(allocator, null);
     assertNull(config.getCalendar());
   }
 
@@ -56,7 +58,7 @@ public class JdbcToArrowConfigTest {
 
   @Test(expected = NullPointerException.class)
   public void testConfigNullAllocator() {
-    new JdbcToArrowConfig(null, calendar, false);
+    new JdbcToArrowConfig(null, calendar);
   }
 
   @Test(expected = NullPointerException.class)
@@ -95,7 +97,8 @@ public class JdbcToArrowConfigTest {
     assertTrue(newCalendar == config.getCalendar());
   }
 
-  @Test public void testIncludeMetadata() {
+  @Test
+  public void testIncludeMetadata() {
     JdbcToArrowConfigBuilder builder = new JdbcToArrowConfigBuilder(allocator, 
calendar, false);
 
     JdbcToArrowConfig config = builder.build();
@@ -108,10 +111,42 @@ public class JdbcToArrowConfigTest {
     config = new JdbcToArrowConfigBuilder(allocator, calendar, true).build();
     assertTrue(config.shouldIncludeMetadata());
 
-    config = new JdbcToArrowConfig(allocator, calendar, true);
+    config = new JdbcToArrowConfig(allocator, calendar, true, null, null);
     assertTrue(config.shouldIncludeMetadata());
 
-    config = new JdbcToArrowConfig(allocator, calendar, false);
+    config = new JdbcToArrowConfig(allocator, calendar, false, null, null);
     assertFalse(config.shouldIncludeMetadata());
   }
+
+  @Test
+  public void testArraySubTypes() {
+    JdbcToArrowConfigBuilder builder = new JdbcToArrowConfigBuilder(allocator, 
calendar, false);
+    JdbcToArrowConfig config = builder.build();
+
+    final int columnIndex = 1;
+    final String columnName = "COLUMN";
+
+    assertNull(config.getArraySubTypeByColumnIndex(columnIndex));
+    assertNull(config.getArraySubTypeByColumnName(columnName));
+
+    final HashMap<Integer, JdbcFieldInfo> indexMapping = new HashMap<Integer, 
JdbcFieldInfo>();
+    indexMapping.put(2, new JdbcFieldInfo(Types.BIGINT));
+
+    final HashMap<String, JdbcFieldInfo> fieldMapping = new HashMap<String, 
JdbcFieldInfo>();
+    fieldMapping.put("NEW_COLUMN", new JdbcFieldInfo(Types.BINARY));
+
+    builder.setArraySubTypeByColumnIndexMap(indexMapping);
+    builder.setArraySubTypeByColumnNameMap(fieldMapping);
+    config = builder.build();
+
+    assertNull(config.getArraySubTypeByColumnIndex(columnIndex));
+    assertNull(config.getArraySubTypeByColumnName(columnName));
+
+    indexMapping.put(columnIndex, new JdbcFieldInfo(Types.BIT));
+    fieldMapping.put(columnName, new JdbcFieldInfo(Types.BLOB));
+
+    assertNotNull(config.getArraySubTypeByColumnIndex(columnIndex));
+    assertEquals(Types.BIT, 
config.getArraySubTypeByColumnIndex(columnIndex).getJdbcType());
+    assertEquals(Types.BLOB, 
config.getArraySubTypeByColumnName(columnName).getJdbcType());
+  }
 }
diff --git 
a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/Table.java 
b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/Table.java
index 98f799c..2137162 100644
--- a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/Table.java
+++ b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/Table.java
@@ -204,7 +204,7 @@ public class Table {
     this.rowCount = rowCount;
   }
 
-  private byte[][] getByteArray(String[] data) {
+  static byte[][] getByteArray(String[] data) {
     byte[][] byteArr = new byte[data.length][];
 
     for (int i = 0; i < data.length; i++) {
@@ -213,7 +213,7 @@ public class Table {
     return byteArr;
   }
 
-  private byte[][] getHexToByteArray(String[] data) {
+  static byte[][] getHexToByteArray(String[] data) {
     byte[][] byteArr = new byte[data.length][];
 
     for (int i = 0; i < data.length; i++) {
@@ -222,7 +222,7 @@ public class Table {
     return byteArr;
   }
 
-  private static byte[] hexStringToByteArray(String s) {
+  static byte[] hexStringToByteArray(String s) {
     int len = s.length();
     byte[] data = new byte[len / 2];
     for (int i = 0; i < len; i += 2) {
diff --git 
a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/h2/JdbcToArrowArrayTest.java
 
b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/h2/JdbcToArrowArrayTest.java
new file mode 100644
index 0000000..e0f8ad9
--- /dev/null
+++ 
b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/h2/JdbcToArrowArrayTest.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.h2;
+
+import static org.junit.Assert.*;
+
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
+import org.apache.arrow.adapter.jdbc.JdbcToArrow;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class JdbcToArrowArrayTest {
+  private Connection conn = null;
+
+  private static final String CREATE_STATEMENT =
+      "CREATE TABLE array_table (id INTEGER, int_array ARRAY, float_array 
ARRAY, string_array ARRAY);";
+  private static final String INSERT_STATEMENT =
+      "INSERT INTO array_table (id, int_array, float_array, string_array) 
VALUES (?, ?, ?, ?);";
+  private static final String QUERY = "SELECT int_array, float_array, 
string_array FROM array_table ORDER BY id;";
+  private static final String DROP_STATEMENT = "DROP TABLE array_table;";
+
+  private static Map<String, JdbcFieldInfo> arrayFieldMapping;
+
+  private static final String INT_ARRAY_FIELD_NAME = "INT_ARRAY";
+  private static final String FLOAT_ARRAY_FIELD_NAME = "FLOAT_ARRAY";
+  private static final String STRING_ARRAY_FIELD_NAME = "STRING_ARRAY";
+
+  @Before
+  public void setUp() throws Exception {
+    String url = "jdbc:h2:mem:JdbcToArrowTest";
+    String driver = "org.h2.Driver";
+    Class.forName(driver);
+    conn = DriverManager.getConnection(url);
+    try (Statement stmt = conn.createStatement()) {
+      stmt.executeUpdate(CREATE_STATEMENT);
+    }
+
+    arrayFieldMapping = new HashMap<String, JdbcFieldInfo>();
+    arrayFieldMapping.put(INT_ARRAY_FIELD_NAME, new 
JdbcFieldInfo(Types.INTEGER));
+    arrayFieldMapping.put(FLOAT_ARRAY_FIELD_NAME, new 
JdbcFieldInfo(Types.REAL));
+    arrayFieldMapping.put(STRING_ARRAY_FIELD_NAME, new 
JdbcFieldInfo(Types.VARCHAR));
+  }
+
+  // This test verifies reading an array field from an H2 database
+  // works as expected.  If this test fails, something is either wrong
+  // with the setup, or the H2 SQL behavior changed.
+  @Test
+  public void testReadH2Array() throws Exception {
+    int rowCount = 4;
+
+    Integer[][] intArrays = generateIntegerArrayField(rowCount);
+    Float[][] floatArrays = generateFloatArrayField(rowCount);
+    String[][] strArrays = generateStringArrayField(rowCount);
+
+    insertRows(rowCount, intArrays, floatArrays, strArrays);
+
+    try (ResultSet resultSet = conn.createStatement().executeQuery(QUERY)) {
+      ResultSetMetaData rsmd = resultSet.getMetaData();
+      assertEquals(3, rsmd.getColumnCount());
+
+      for (int i = 1; i <= rsmd.getColumnCount(); ++i) {
+        assertEquals(Types.ARRAY, rsmd.getColumnType(i));
+      }
+
+      int rowNum = 0;
+
+      while (resultSet.next()) {
+        Array intArray = resultSet.getArray(INT_ARRAY_FIELD_NAME);
+        assertFalse(resultSet.wasNull());
+        try (ResultSet rs = intArray.getResultSet()) {
+          int arrayIndex = 0;
+          while (rs.next()) {
+            assertEquals(intArrays[rowNum][arrayIndex].intValue(), 
rs.getInt(2));
+            ++arrayIndex;
+          }
+          assertEquals(intArrays[rowNum].length, arrayIndex);
+        }
+
+        Array floatArray = resultSet.getArray(FLOAT_ARRAY_FIELD_NAME);
+        assertFalse(resultSet.wasNull());
+        try (ResultSet rs = floatArray.getResultSet()) {
+          int arrayIndex = 0;
+          while (rs.next()) {
+            assertEquals(floatArrays[rowNum][arrayIndex].floatValue(), 
rs.getFloat(2), 0.001);
+            ++arrayIndex;
+          }
+          assertEquals(floatArrays[rowNum].length, arrayIndex);
+        }
+
+        Array strArray = resultSet.getArray(STRING_ARRAY_FIELD_NAME);
+        assertFalse(resultSet.wasNull());
+        try (ResultSet rs = strArray.getResultSet()) {
+          int arrayIndex = 0;
+          while (rs.next()) {
+            assertEquals(strArrays[rowNum][arrayIndex], rs.getString(2));
+            ++arrayIndex;
+          }
+          assertEquals(strArrays[rowNum].length, arrayIndex);
+        }
+
+        ++rowNum;
+      }
+
+      assertEquals(rowCount, rowNum);
+    }
+  }
+
+  @Test
+  public void testJdbcToArrow() throws Exception {
+    int rowCount = 4;
+
+    Integer[][] intArrays = generateIntegerArrayField(rowCount);
+    Float[][] floatArrays = generateFloatArrayField(rowCount);
+    String[][] strArrays = generateStringArrayField(rowCount);
+
+    insertRows(rowCount, intArrays, floatArrays, strArrays);
+
+    final JdbcToArrowConfigBuilder builder =
+        new JdbcToArrowConfigBuilder(new RootAllocator(Integer.MAX_VALUE), 
JdbcToArrowUtils.getUtcCalendar(), false);
+    builder.setArraySubTypeByColumnNameMap(arrayFieldMapping);
+
+    final JdbcToArrowConfig config = builder.build();
+
+    try (ResultSet resultSet = conn.createStatement().executeQuery(QUERY)) {
+      final VectorSchemaRoot vector = JdbcToArrow.sqlToArrow(resultSet, 
config);
+
+      assertEquals(rowCount, vector.getRowCount());
+
+      assertIntegerVectorEquals((ListVector) 
vector.getVector(INT_ARRAY_FIELD_NAME), rowCount, intArrays);
+      assertFloatVectorEquals((ListVector) 
vector.getVector(FLOAT_ARRAY_FIELD_NAME), rowCount, floatArrays);
+      assertStringVectorEquals((ListVector) 
vector.getVector(STRING_ARRAY_FIELD_NAME), rowCount, strArrays);
+    }
+  }
+
+  @Test
+  public void testJdbcToArrowWithNulls() throws Exception {
+    int rowCount = 4;
+
+    Integer[][] intArrays = {
+        null,
+        {0},
+        {1},
+        {},
+    };
+
+    Float[][] floatArrays = {
+        { 2.0f },
+        null,
+        { 3.0f },
+        {},
+    };
+
+    String[][] stringArrays = {
+        {"4"},
+        null,
+        {"5"},
+        {},
+    };
+
+    insertRows(rowCount, intArrays, floatArrays, stringArrays);
+
+    final JdbcToArrowConfigBuilder builder =
+        new JdbcToArrowConfigBuilder(new RootAllocator(Integer.MAX_VALUE), 
JdbcToArrowUtils.getUtcCalendar(), false);
+    builder.setArraySubTypeByColumnNameMap(arrayFieldMapping);
+
+    final JdbcToArrowConfig config = builder.build();
+
+    try (ResultSet resultSet = conn.createStatement().executeQuery(QUERY)) {
+      final VectorSchemaRoot vector = JdbcToArrow.sqlToArrow(resultSet, 
config);
+
+      assertEquals(rowCount, vector.getRowCount());
+
+      assertIntegerVectorEquals((ListVector) 
vector.getVector(INT_ARRAY_FIELD_NAME), rowCount, intArrays);
+      assertFloatVectorEquals((ListVector) 
vector.getVector(FLOAT_ARRAY_FIELD_NAME), rowCount, floatArrays);
+      assertStringVectorEquals((ListVector) 
vector.getVector(STRING_ARRAY_FIELD_NAME), rowCount, stringArrays);
+    }
+  }
+
+  private void assertIntegerVectorEquals(ListVector listVector, int rowCount, 
Integer[][] expectedValues) {
+    IntVector vector = (IntVector) listVector.getDataVector();
+    ArrowBuf offsetBuffer = listVector.getOffsetBuffer();
+
+    int prevOffset = 0;
+    for (int row = 0; row < rowCount; ++row) {
+      int offset = offsetBuffer.getInt((row + 1) * ListVector.OFFSET_WIDTH);
+
+      if (expectedValues[row] == null) {
+        assertEquals(0, listVector.isSet(row));
+        assertEquals(0, offset - prevOffset);
+        continue;
+      }
+
+      assertEquals(1, listVector.isSet(row));
+      assertEquals(expectedValues[row].length, offset - prevOffset);
+
+      for (int i = prevOffset; i < offset; ++i) {
+        assertEquals(expectedValues[row][i - prevOffset].intValue(), 
vector.get(i));
+      }
+
+      prevOffset = offset;
+    }
+  }
+
+  private void assertFloatVectorEquals(ListVector listVector, int rowCount, 
Float[][] expectedValues) {
+    Float4Vector vector = (Float4Vector) listVector.getDataVector();
+    ArrowBuf offsetBuffer = listVector.getOffsetBuffer();
+
+    int prevOffset = 0;
+    for (int row = 0; row < rowCount; ++row) {
+      int offset = offsetBuffer.getInt((row + 1) * ListVector.OFFSET_WIDTH);
+
+      if (expectedValues[row] == null) {
+        assertEquals(0, listVector.isSet(row));
+        assertEquals(0, offset - prevOffset);
+        continue;
+      }
+
+      assertEquals(1, listVector.isSet(row));
+      assertEquals(expectedValues[row].length, offset - prevOffset);
+
+      for (int i = prevOffset; i < offset; ++i) {
+        assertEquals(expectedValues[row][i - prevOffset].floatValue(), 
vector.get(i), 0);
+      }
+
+      prevOffset = offset;
+    }
+  }
+
+  private void assertStringVectorEquals(ListVector listVector, int rowCount, 
String[][] expectedValues) {
+    VarCharVector vector = (VarCharVector) listVector.getDataVector();
+    ArrowBuf offsetBuffer = listVector.getOffsetBuffer();
+
+    int prevOffset = 0;
+    for (int row = 0; row < rowCount; ++row) {
+      int offset = offsetBuffer.getInt((row + 1) * ListVector.OFFSET_WIDTH);
+
+      if (expectedValues[row] == null) {
+        assertEquals(0, listVector.isSet(row));
+        assertEquals(0, offset - prevOffset);
+        continue;
+      }
+
+      assertEquals(1, listVector.isSet(row));
+      assertEquals(expectedValues[row].length, offset - prevOffset);
+      for (int i = prevOffset; i < offset; ++i) {
+        assertArrayEquals(expectedValues[row][i - prevOffset].getBytes(), 
vector.get(i));
+      }
+
+      prevOffset = offset;
+    }
+  }
+
+  @After
+  public void tearDown() throws SQLException {
+    try (Statement stmt = conn.createStatement()) {
+      stmt.executeUpdate(DROP_STATEMENT);
+    } finally {
+      if (conn != null) {
+        conn.close();
+        conn = null;
+      }
+    }
+  }
+
+  private Integer[][] generateIntegerArrayField(int numRows) {
+    Integer[][] result = new Integer[numRows][];
+
+    for (int i = 0; i < numRows; ++i) {
+      int val = i * 4;
+      result[i] = new Integer[]{val, val + 1, val + 2, val + 3};
+    }
+
+    return result;
+  }
+
+  private Float[][] generateFloatArrayField(int numRows) {
+    Float[][] result = new Float[numRows][];
+  
+    for (int i = 0; i < numRows; ++i) {
+      int val = i * 4;
+      result[i] = new Float[]{(float) val, (float) val + 1, (float) val + 2, 
(float) val + 3};
+    }
+
+    return result;
+  }
+
+  private String[][] generateStringArrayField(int numRows) {
+    String[][] result = new String[numRows][];
+
+    for (int i = 0; i < numRows; ++i) {
+      int val = i * 4;
+      result[i] = new String[]{
+          String.valueOf(val),
+          String.valueOf(val + 1),
+          String.valueOf(val + 2),
+          String.valueOf(val + 3) };
+    }
+
+    return result;
+  }
+
+  private void insertRows(
+      int numRows,
+      Integer[][] integerArrays,
+      Float[][] floatArrays,
+      String[][] strArrays)
+          throws SQLException {
+
+    // Insert 4 Rows
+    try (PreparedStatement stmt = conn.prepareStatement(INSERT_STATEMENT)) {
+
+      for (int i = 0; i < numRows; ++i) {
+        Integer[] integerArray = integerArrays[i];
+        Float[] floatArray = floatArrays[i];
+        String[] strArray = strArrays[i];
+
+        Array intArray = conn.createArrayOf("INT", integerArray);
+        Array realArray = conn.createArrayOf("REAL", floatArray);
+        Array varcharArray = conn.createArrayOf("VARCHAR", strArray);
+
+        // Insert Arrays of 4 Values in Each Row
+        stmt.setInt(1, i);
+        stmt.setArray(2, intArray);
+        stmt.setArray(3, realArray);
+        stmt.setArray(4, varcharArray);
+
+        stmt.executeUpdate();
+
+        intArray.free();
+        realArray.free();
+        varcharArray.free();
+      }
+    }
+  }
+}

Reply via email to