rafael-telles commented on a change in pull request #10906:
URL: https://github.com/apache/arrow/pull/10906#discussion_r696049366



##########
File path: 
java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java
##########
@@ -0,0 +1,1733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.sql.example;
+
+import static com.google.common.base.Strings.emptyToNull;
+import static com.google.protobuf.Any.pack;
+import static com.google.protobuf.ByteString.copyFrom;
+import static java.lang.String.format;
+import static java.util.Collections.singletonList;
+import static java.util.Objects.isNull;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.StreamSupport.stream;
+import static 
org.apache.arrow.adapter.jdbc.JdbcToArrow.sqlToArrowVectorIterator;
+import static org.apache.arrow.adapter.jdbc.JdbcToArrowUtils.jdbcToArrowSchema;
+import static org.apache.arrow.util.Preconditions.checkState;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
+import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
+import org.apache.arrow.flight.CallStatus;
+import org.apache.arrow.flight.Criteria;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightEndpoint;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.PutResult;
+import org.apache.arrow.flight.Result;
+import org.apache.arrow.flight.SchemaResult;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.flight.sql.FlightSqlProducer;
+import org.apache.arrow.flight.sql.impl.FlightSql;
+import 
org.apache.arrow.flight.sql.impl.FlightSql.ActionClosePreparedStatementRequest;
+import 
org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementRequest;
+import 
org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementResult;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCatalogs;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetPrimaryKeys;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSchemas;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSqlInfo;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTableTypes;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables;
+import 
org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery;
+import 
org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementUpdate;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementQuery;
+import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementUpdate;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.Decimal256Vector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.LargeVarCharVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.TimeStampNanoTZVector;
+import org.apache.arrow.vector.TimeStampSecTZVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.UInt1Vector;
+import org.apache.arrow.vector.UInt2Vector;
+import org.apache.arrow.vector.UInt4Vector;
+import org.apache.arrow.vector.UInt8Vector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.complex.DenseUnionVector;
+import org.apache.arrow.vector.holders.NullableIntHolder;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.arrow.vector.ipc.message.IpcOption;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+import org.apache.commons.dbcp2.ConnectionFactory;
+import org.apache.commons.dbcp2.DriverManagerConnectionFactory;
+import org.apache.commons.dbcp2.PoolableConnection;
+import org.apache.commons.dbcp2.PoolableConnectionFactory;
+import org.apache.commons.dbcp2.PoolingDataSource;
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.slf4j.Logger;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+
+/**
+ * Proof of concept {@link FlightSqlProducer} implementation showing an Apache 
Derby backed Flight SQL server capable
+ * of the following workflows:
+ * <!--
+ * TODO Revise summary: is it still matching?
+ * -->
+ * - returning a list of tables from the action `GetTables`.
+ * - creation of a prepared statement from the action 
`CreatePreparedStatement`.
+ * - execution of a prepared statement by using a {@link 
CommandPreparedStatementQuery}
+ * with {@link #getFlightInfo} and {@link #getStream}.
+ */
+public class FlightSqlExample implements FlightSqlProducer, AutoCloseable {
+  private static final IpcOption DEFAULT_OPTION = IpcOption.DEFAULT;
+  private static final String DATABASE_URI = "jdbc:derby:target/derbyDB";
+  private static final Logger LOGGER = getLogger(FlightSqlExample.class);
+  private static final Calendar DEFAULT_CALENDAR = 
JdbcToArrowUtils.getUtcCalendar();
+  private final Location location;
+  private final PoolingDataSource<PoolableConnection> dataSource;
+  private final BufferAllocator rootAllocator = new RootAllocator();
+  private final Cache<ByteString, StatementContext<PreparedStatement>> 
preparedStatementLoadingCache;
+  private final Cache<ByteString, StatementContext<Statement>> 
statementLoadingCache;
+  private final LoadingCache<ByteString, ResultSet> 
commandExecuteStatementLoadingCache;
+
+  public FlightSqlExample(final Location location) {
+    // TODO Constructor should not be doing work.
+    checkState(
+        removeDerbyDatabaseIfExists() && populateDerbyDatabase(),
+        "Failed to reset Derby database!");
+    final ConnectionFactory connectionFactory =
+        new DriverManagerConnectionFactory(DATABASE_URI, new Properties());
+    final PoolableConnectionFactory poolableConnectionFactory =
+        new PoolableConnectionFactory(connectionFactory, null);
+    final ObjectPool<PoolableConnection> connectionPool = new 
GenericObjectPool<>(poolableConnectionFactory);
+
+    poolableConnectionFactory.setPool(connectionPool);
+    // PoolingDataSource takes ownership of `connectionPool`
+    dataSource = new PoolingDataSource<>(connectionPool);
+
+    preparedStatementLoadingCache =
+        CacheBuilder.newBuilder()
+            .maximumSize(100)
+            .expireAfterWrite(10, TimeUnit.MINUTES)
+            .removalListener(new StatementRemovalListener<PreparedStatement>())
+            .build();
+
+    statementLoadingCache =
+        CacheBuilder.newBuilder()
+            .maximumSize(100)
+            .expireAfterWrite(10, TimeUnit.MINUTES)
+            .removalListener(new StatementRemovalListener<>())
+            .build();
+
+    commandExecuteStatementLoadingCache =
+        CacheBuilder.newBuilder()
+            .maximumSize(100)
+            .expireAfterWrite(10, TimeUnit.MINUTES)
+            .removalListener(new CommandExecuteStatementRemovalListener())
+            .build(new 
CommandExecuteStatementCacheLoader(statementLoadingCache));
+
+    this.location = location;
+  }
+
+  private static boolean removeDerbyDatabaseIfExists() {
+    boolean wasSuccess;
+    final Path path = Paths.get("target" + File.separator + "derbyDB");
+
+    try (final Stream<Path> walk = Files.walk(path)) {
+      /*
+       * Iterate over all paths to delete, mapping each path to the outcome of 
its own
+       * deletion as a boolean representing whether or not each individual 
operation was
+       * successful; then reduce all booleans into a single answer, and store 
that into
+       * `wasSuccess`, which will later be returned by this method.
+       * If for whatever reason the resulting `Stream<Boolean>` is empty, 
throw an `IOException`;
+       * this not expected.
+       */
+      wasSuccess = 
walk.sorted(Comparator.reverseOrder()).map(Path::toFile).map(File::delete)
+          .reduce(Boolean::logicalAnd).orElseThrow(IOException::new);
+    } catch (IOException e) {
+      /*
+       * The only acceptable scenario for an `IOException` to be thrown here 
is if
+       * an attempt to delete an non-existing file takes place -- which should 
be
+       * alright, since they would be deleted anyway.
+       */
+      if (!(wasSuccess = e instanceof NoSuchFileException)) {
+        LOGGER.error(format("Failed attempt to clear DerbyDB: <%s>", 
e.getMessage()), e);
+      }
+    }
+
+    return wasSuccess;
+  }
+
+  private static boolean populateDerbyDatabase() {
+    try (final Connection connection = 
DriverManager.getConnection("jdbc:derby:target/derbyDB;create=true");
+         Statement statement = connection.createStatement()) {
+      statement.execute("CREATE TABLE foreignTable (" +
+          "id INT not null primary key GENERATED ALWAYS AS IDENTITY (START 
WITH 1, INCREMENT BY 1), " +
+          "foreignName varchar(100), " +
+          "value int)");
+      statement.execute("CREATE TABLE intTable (" +
+          "id INT not null primary key GENERATED ALWAYS AS IDENTITY (START 
WITH 1, INCREMENT BY 1), " +
+          "keyName varchar(100), " +
+          "value int, " +
+          "foreignId int references foreignTable(id))");
+      statement.execute("INSERT INTO foreignTable (foreignName, value) VALUES 
('keyOne', 1)");
+      statement.execute("INSERT INTO foreignTable (foreignName, value) VALUES 
('keyTwo', 0)");
+      statement.execute("INSERT INTO foreignTable (foreignName, value) VALUES 
('keyThree', -1)");
+      statement.execute("INSERT INTO intTable (keyName, value, foreignId) 
VALUES ('one', 1, 1)");
+      statement.execute("INSERT INTO intTable (keyName, value, foreignId) 
VALUES ('zero', 0, 1)");
+      statement.execute("INSERT INTO intTable (keyName, value, foreignId) 
VALUES ('negative one', -1, 1)");
+    } catch (final SQLException e) {
+      LOGGER.error(format("Failed attempt to populate DerbyDB: <%s>", 
e.getMessage()), e);
+      return false;
+    }
+    return true;
+  }
+
+  private static ArrowType getArrowTypeFromJdbcType(final int jdbcDataType, 
final int precision, final int scale) {
+    final ArrowType type =
+        JdbcToArrowConfig.getDefaultJdbcToArrowTypeConverter().apply(new 
JdbcFieldInfo(jdbcDataType, precision, scale),
+            DEFAULT_CALENDAR);
+    return isNull(type) ? ArrowType.Utf8.INSTANCE : type;
+  }
+
+  private static void saveToVector(final Byte data, final UInt1Vector vector, 
final int index) {
+    vectorConsumer(
+        data,
+        vector,
+        fieldVector -> fieldVector.setNull(index),
+        (theData, fieldVector) -> fieldVector.setSafe(index, theData));
+  }
+
+  private static void saveToVector(final byte typeRegisteredId, final String 
data,
+                                   final DenseUnionVector vector, final int 
index) {
+    vectorConsumer(
+        data,
+        vector,
+        fieldVector -> {
+          // Nothing.
+        },
+        (theData, fieldVector) -> {
+          final String effectiveData = (isNull(data)) ? "" : data;
+          final NullableVarCharHolder holder = new NullableVarCharHolder();
+          final int dataLength = effectiveData.length();
+          final ArrowBuf buffer = 
fieldVector.getAllocator().buffer(dataLength);
+          buffer.writeBytes(effectiveData.getBytes(StandardCharsets.UTF_8));
+          holder.buffer = buffer;
+          holder.end = dataLength;
+          holder.isSet = 1;
+          fieldVector.setTypeId(index, typeRegisteredId);
+          fieldVector.setSafe(index, holder);
+        });
+  }
+
+  private static void saveToVector(final byte typeRegisteredId, final Integer 
data,
+                                   final DenseUnionVector vector, final int 
index) {
+    vectorConsumer(
+        data,
+        vector,
+        fieldVector -> {
+          // Nothing.
+        },
+        (theData, fieldVector) -> {
+          final NullableIntHolder holder = new NullableIntHolder();
+          holder.value = isNull(data) ? 0 : data;
+          holder.isSet = 1;
+          fieldVector.setTypeId(index, typeRegisteredId);
+          fieldVector.setSafe(index, holder);
+        });
+  }
+
+  private static void saveToVector(final Integer data, final UInt4Vector 
vector, final int index) {
+    preconditionCheckSaveToVector(vector, index);
+    vectorConsumer(data, vector, fieldVector -> fieldVector.setNull(index),
+        (theData, fieldVector) -> fieldVector.setSafe(index, data));
+  }
+
+  private static void saveToVector(final String data, final VarCharVector 
vector, final int index) {
+    preconditionCheckSaveToVector(vector, index);
+    vectorConsumer(data, vector, fieldVector -> fieldVector.setNull(index),
+        (theData, fieldVector) -> fieldVector.setSafe(index, new 
Text(theData)));
+  }
+
+  private static void saveToVector(final Integer data, final IntVector vector, 
final int index) {
+    preconditionCheckSaveToVector(vector, index);
+    vectorConsumer(data, vector, fieldVector -> fieldVector.setNull(index),
+        (theData, fieldVector) -> fieldVector.setSafe(index, theData));
+  }
+
+  private static void saveToVector(final byte[] data, final VarBinaryVector 
vector, final int index) {
+    preconditionCheckSaveToVector(vector, index);
+    vectorConsumer(data, vector, fieldVector -> fieldVector.setNull(index),
+        (theData, fieldVector) -> fieldVector.setSafe(index, theData));
+  }
+
+  private static void preconditionCheckSaveToVector(final FieldVector vector, 
final int index) {
+    Objects.requireNonNull(vector, "vector cannot be null.");
+    checkState(index >= 0, "Index must be a positive number!");
+  }
+
+  private static <T, V extends FieldVector> void vectorConsumer(final T data, 
final V vector,
+                                                                final 
Consumer<V> consumerIfNullable,
+                                                                final 
BiConsumer<T, V> defaultConsumer) {
+    if (isNull(data)) {
+      consumerIfNullable.accept(vector);
+      return;
+    }
+    defaultConsumer.accept(data, vector);
+  }
+
+  private static VectorSchemaRoot getSchemasRoot(final ResultSet data, final 
BufferAllocator allocator)
+      throws SQLException {
+    final VarCharVector catalogs = new VarCharVector("catalog_name", 
allocator);
+    final VarCharVector schemas = new VarCharVector("schema_name", allocator);
+    final List<FieldVector> vectors = ImmutableList.of(catalogs, schemas);
+    vectors.forEach(FieldVector::allocateNew);
+    final Map<FieldVector, String> vectorToColumnName = ImmutableMap.of(
+        catalogs, "TABLE_CATALOG",
+        schemas, "TABLE_SCHEM");
+    saveToVectors(vectorToColumnName, data);
+    final int rows = 
vectors.stream().map(FieldVector::getValueCount).findAny().orElseThrow(IllegalStateException::new);
+    vectors.forEach(vector -> vector.setValueCount(rows));
+    return new VectorSchemaRoot(vectors);
+  }
+
+  private static <T extends FieldVector> int saveToVectors(final Map<T, 
String> vectorToColumnName,
+                                                           final ResultSet 
data, boolean emptyToNull)
+      throws SQLException {
+    Objects.requireNonNull(vectorToColumnName, "vectorToColumnName cannot be 
null.");
+    Objects.requireNonNull(data, "data cannot be null.");
+    final Set<Entry<T, String>> entrySet = vectorToColumnName.entrySet();
+    int rows = 0;
+    for (; data.next(); rows++) {
+      for (final Entry<T, String> vectorToColumn : entrySet) {
+        final T vector = vectorToColumn.getKey();
+        final String columnName = vectorToColumn.getValue();
+        if (vector instanceof VarCharVector) {
+          String thisData = data.getString(columnName);
+          saveToVector(emptyToNull ? emptyToNull(thisData) : thisData, 
(VarCharVector) vector, rows);
+          continue;
+        } else if (vector instanceof IntVector) {
+          final int intValue = data.getInt(columnName);
+          saveToVector(data.wasNull() ? null : intValue, (IntVector) vector, 
rows);
+          continue;
+        } else if (vector instanceof UInt1Vector) {
+          final byte byteValue = data.getByte(columnName);
+          saveToVector(data.wasNull() ? null : byteValue, (UInt1Vector) 
vector, rows);
+          continue;
+        }
+        throw CallStatus.INVALID_ARGUMENT.withDescription("Provided vector not 
supported").toRuntimeException();
+      }
+    }
+    for (final Entry<T, String> vectorToColumn : entrySet) {
+      vectorToColumn.getKey().setValueCount(rows);
+    }
+
+    return rows;
+  }
+
+  private static <T extends FieldVector> void saveToVectors(final Map<T, 
String> vectorToColumnName,
+                                                            final ResultSet 
data)
+      throws SQLException {
+    saveToVectors(vectorToColumnName, data, false);
+  }
+
+  private static VectorSchemaRoot getTableTypesRoot(final ResultSet data, 
final BufferAllocator allocator)
+      throws SQLException {
+    return getRoot(data, allocator, "table_type", "TABLE_TYPE");
+  }
+
+  private static VectorSchemaRoot getCatalogsRoot(final ResultSet data, final 
BufferAllocator allocator)
+      throws SQLException {
+    return getRoot(data, allocator, "catalog_name", "TABLE_CATALOG");
+  }
+
+  private static VectorSchemaRoot getRoot(final ResultSet data, final 
BufferAllocator allocator,
+                                          final String fieldVectorName, final 
String columnName)
+      throws SQLException {
+    final VarCharVector dataVector = new VarCharVector(fieldVectorName, 
allocator);
+    saveToVectors(ImmutableMap.of(dataVector, columnName), data);
+    final int rows = dataVector.getValueCount();
+    dataVector.setValueCount(rows);
+    return new VectorSchemaRoot(singletonList(dataVector));
+  }
+
+  private static VectorSchemaRoot getTablesRoot(final DatabaseMetaData 
databaseMetaData,
+                                                final BufferAllocator 
allocator,
+                                                final boolean includeSchema,
+                                                final String catalog,
+                                                final String 
schemaFilterPattern,
+                                                final String 
tableFilterPattern,
+                                                final String... tableTypes)
+      throws SQLException, IOException {
+    /*
+     * TODO Fix DerbyDB inconsistency if possible.
+     * During the early development of this prototype, an inconsistency has 
been found in the database
+     * used for this demonstration; as DerbyDB does not operate with the 
concept of catalogs, fetching
+     * the catalog name for a given table from `DatabaseMetadata#getColumns` 
and `DatabaseMetadata#getSchemas`
+     * returns null, as expected. However, the inconsistency lies in the fact 
that accessing the same
+     * information -- that is, the catalog name for a given table -- from 
`DatabaseMetadata#getSchemas`
+     * returns an empty String.The temporary workaround for this was making 
sure we convert the empty Strings
+     * to null using `com.google.common.base.Strings#emptyToNull`.
+     */
+    Objects.requireNonNull(allocator, "BufferAllocator cannot be null.");
+    final VarCharVector catalogNameVector = new VarCharVector("catalog_name", 
allocator);
+    final VarCharVector schemaNameVector = new VarCharVector("schema_name", 
allocator);
+    final VarCharVector tableNameVector = new VarCharVector("table_name", 
allocator);
+    final VarCharVector tableTypeVector = new VarCharVector("table_type", 
allocator);
+
+    final List<FieldVector> vectors =
+        new ArrayList<>(

Review comment:
       In this particular method - yes.
   If `includeSchema == true` then the VectorSchemaRoot will have an additional 
`table_schema` vector




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to