rafael-telles commented on a change in pull request #10906: URL: https://github.com/apache/arrow/pull/10906#discussion_r696049366
########## File path: java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java ########## @@ -0,0 +1,1733 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.sql.example; + +import static com.google.common.base.Strings.emptyToNull; +import static com.google.protobuf.Any.pack; +import static com.google.protobuf.ByteString.copyFrom; +import static java.lang.String.format; +import static java.util.Collections.singletonList; +import static java.util.Objects.isNull; +import static java.util.UUID.randomUUID; +import static java.util.stream.StreamSupport.stream; +import static org.apache.arrow.adapter.jdbc.JdbcToArrow.sqlToArrowVectorIterator; +import static org.apache.arrow.adapter.jdbc.JdbcToArrowUtils.jdbcToArrowSchema; +import static org.apache.arrow.util.Preconditions.checkState; +import static org.slf4j.LoggerFactory.getLogger; + +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.TimeZone; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import org.apache.arrow.adapter.jdbc.ArrowVectorIterator; +import org.apache.arrow.adapter.jdbc.JdbcFieldInfo; +import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; +import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.Criteria; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.PutResult; +import org.apache.arrow.flight.Result; +import org.apache.arrow.flight.SchemaResult; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.sql.FlightSqlProducer; +import org.apache.arrow.flight.sql.impl.FlightSql; +import org.apache.arrow.flight.sql.impl.FlightSql.ActionClosePreparedStatementRequest; +import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementRequest; +import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementResult; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCatalogs; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetPrimaryKeys; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSchemas; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSqlInfo; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTableTypes; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementUpdate; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementQuery; +import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementUpdate; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DateMilliVector; +import org.apache.arrow.vector.Decimal256Vector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.LargeVarCharVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeMilliVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeSecVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.TimeStampSecTZVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.UInt1Vector; +import org.apache.arrow.vector.UInt2Vector; +import org.apache.arrow.vector.UInt4Vector; +import org.apache.arrow.vector.UInt8Vector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.complex.DenseUnionVector; +import org.apache.arrow.vector.holders.NullableIntHolder; +import org.apache.arrow.vector.holders.NullableVarCharHolder; +import org.apache.arrow.vector.ipc.message.IpcOption; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; +import org.apache.commons.dbcp2.ConnectionFactory; +import org.apache.commons.dbcp2.DriverManagerConnectionFactory; +import org.apache.commons.dbcp2.PoolableConnection; +import org.apache.commons.dbcp2.PoolableConnectionFactory; +import org.apache.commons.dbcp2.PoolingDataSource; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.impl.GenericObjectPool; +import org.slf4j.Logger; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import com.google.protobuf.ProtocolStringList; + +/** + * Proof of concept {@link FlightSqlProducer} implementation showing an Apache Derby backed Flight SQL server capable + * of the following workflows: + * <!-- + * TODO Revise summary: is it still matching? + * --> + * - returning a list of tables from the action `GetTables`. + * - creation of a prepared statement from the action `CreatePreparedStatement`. + * - execution of a prepared statement by using a {@link CommandPreparedStatementQuery} + * with {@link #getFlightInfo} and {@link #getStream}. + */ +public class FlightSqlExample implements FlightSqlProducer, AutoCloseable { + private static final IpcOption DEFAULT_OPTION = IpcOption.DEFAULT; + private static final String DATABASE_URI = "jdbc:derby:target/derbyDB"; + private static final Logger LOGGER = getLogger(FlightSqlExample.class); + private static final Calendar DEFAULT_CALENDAR = JdbcToArrowUtils.getUtcCalendar(); + private final Location location; + private final PoolingDataSource<PoolableConnection> dataSource; + private final BufferAllocator rootAllocator = new RootAllocator(); + private final Cache<ByteString, StatementContext<PreparedStatement>> preparedStatementLoadingCache; + private final Cache<ByteString, StatementContext<Statement>> statementLoadingCache; + private final LoadingCache<ByteString, ResultSet> commandExecuteStatementLoadingCache; + + public FlightSqlExample(final Location location) { + // TODO Constructor should not be doing work. + checkState( + removeDerbyDatabaseIfExists() && populateDerbyDatabase(), + "Failed to reset Derby database!"); + final ConnectionFactory connectionFactory = + new DriverManagerConnectionFactory(DATABASE_URI, new Properties()); + final PoolableConnectionFactory poolableConnectionFactory = + new PoolableConnectionFactory(connectionFactory, null); + final ObjectPool<PoolableConnection> connectionPool = new GenericObjectPool<>(poolableConnectionFactory); + + poolableConnectionFactory.setPool(connectionPool); + // PoolingDataSource takes ownership of `connectionPool` + dataSource = new PoolingDataSource<>(connectionPool); + + preparedStatementLoadingCache = + CacheBuilder.newBuilder() + .maximumSize(100) + .expireAfterWrite(10, TimeUnit.MINUTES) + .removalListener(new StatementRemovalListener<PreparedStatement>()) + .build(); + + statementLoadingCache = + CacheBuilder.newBuilder() + .maximumSize(100) + .expireAfterWrite(10, TimeUnit.MINUTES) + .removalListener(new StatementRemovalListener<>()) + .build(); + + commandExecuteStatementLoadingCache = + CacheBuilder.newBuilder() + .maximumSize(100) + .expireAfterWrite(10, TimeUnit.MINUTES) + .removalListener(new CommandExecuteStatementRemovalListener()) + .build(new CommandExecuteStatementCacheLoader(statementLoadingCache)); + + this.location = location; + } + + private static boolean removeDerbyDatabaseIfExists() { + boolean wasSuccess; + final Path path = Paths.get("target" + File.separator + "derbyDB"); + + try (final Stream<Path> walk = Files.walk(path)) { + /* + * Iterate over all paths to delete, mapping each path to the outcome of its own + * deletion as a boolean representing whether or not each individual operation was + * successful; then reduce all booleans into a single answer, and store that into + * `wasSuccess`, which will later be returned by this method. + * If for whatever reason the resulting `Stream<Boolean>` is empty, throw an `IOException`; + * this not expected. + */ + wasSuccess = walk.sorted(Comparator.reverseOrder()).map(Path::toFile).map(File::delete) + .reduce(Boolean::logicalAnd).orElseThrow(IOException::new); + } catch (IOException e) { + /* + * The only acceptable scenario for an `IOException` to be thrown here is if + * an attempt to delete an non-existing file takes place -- which should be + * alright, since they would be deleted anyway. + */ + if (!(wasSuccess = e instanceof NoSuchFileException)) { + LOGGER.error(format("Failed attempt to clear DerbyDB: <%s>", e.getMessage()), e); + } + } + + return wasSuccess; + } + + private static boolean populateDerbyDatabase() { + try (final Connection connection = DriverManager.getConnection("jdbc:derby:target/derbyDB;create=true"); + Statement statement = connection.createStatement()) { + statement.execute("CREATE TABLE foreignTable (" + + "id INT not null primary key GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " + + "foreignName varchar(100), " + + "value int)"); + statement.execute("CREATE TABLE intTable (" + + "id INT not null primary key GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), " + + "keyName varchar(100), " + + "value int, " + + "foreignId int references foreignTable(id))"); + statement.execute("INSERT INTO foreignTable (foreignName, value) VALUES ('keyOne', 1)"); + statement.execute("INSERT INTO foreignTable (foreignName, value) VALUES ('keyTwo', 0)"); + statement.execute("INSERT INTO foreignTable (foreignName, value) VALUES ('keyThree', -1)"); + statement.execute("INSERT INTO intTable (keyName, value, foreignId) VALUES ('one', 1, 1)"); + statement.execute("INSERT INTO intTable (keyName, value, foreignId) VALUES ('zero', 0, 1)"); + statement.execute("INSERT INTO intTable (keyName, value, foreignId) VALUES ('negative one', -1, 1)"); + } catch (final SQLException e) { + LOGGER.error(format("Failed attempt to populate DerbyDB: <%s>", e.getMessage()), e); + return false; + } + return true; + } + + private static ArrowType getArrowTypeFromJdbcType(final int jdbcDataType, final int precision, final int scale) { + final ArrowType type = + JdbcToArrowConfig.getDefaultJdbcToArrowTypeConverter().apply(new JdbcFieldInfo(jdbcDataType, precision, scale), + DEFAULT_CALENDAR); + return isNull(type) ? ArrowType.Utf8.INSTANCE : type; + } + + private static void saveToVector(final Byte data, final UInt1Vector vector, final int index) { + vectorConsumer( + data, + vector, + fieldVector -> fieldVector.setNull(index), + (theData, fieldVector) -> fieldVector.setSafe(index, theData)); + } + + private static void saveToVector(final byte typeRegisteredId, final String data, + final DenseUnionVector vector, final int index) { + vectorConsumer( + data, + vector, + fieldVector -> { + // Nothing. + }, + (theData, fieldVector) -> { + final String effectiveData = (isNull(data)) ? "" : data; + final NullableVarCharHolder holder = new NullableVarCharHolder(); + final int dataLength = effectiveData.length(); + final ArrowBuf buffer = fieldVector.getAllocator().buffer(dataLength); + buffer.writeBytes(effectiveData.getBytes(StandardCharsets.UTF_8)); + holder.buffer = buffer; + holder.end = dataLength; + holder.isSet = 1; + fieldVector.setTypeId(index, typeRegisteredId); + fieldVector.setSafe(index, holder); + }); + } + + private static void saveToVector(final byte typeRegisteredId, final Integer data, + final DenseUnionVector vector, final int index) { + vectorConsumer( + data, + vector, + fieldVector -> { + // Nothing. + }, + (theData, fieldVector) -> { + final NullableIntHolder holder = new NullableIntHolder(); + holder.value = isNull(data) ? 0 : data; + holder.isSet = 1; + fieldVector.setTypeId(index, typeRegisteredId); + fieldVector.setSafe(index, holder); + }); + } + + private static void saveToVector(final Integer data, final UInt4Vector vector, final int index) { + preconditionCheckSaveToVector(vector, index); + vectorConsumer(data, vector, fieldVector -> fieldVector.setNull(index), + (theData, fieldVector) -> fieldVector.setSafe(index, data)); + } + + private static void saveToVector(final String data, final VarCharVector vector, final int index) { + preconditionCheckSaveToVector(vector, index); + vectorConsumer(data, vector, fieldVector -> fieldVector.setNull(index), + (theData, fieldVector) -> fieldVector.setSafe(index, new Text(theData))); + } + + private static void saveToVector(final Integer data, final IntVector vector, final int index) { + preconditionCheckSaveToVector(vector, index); + vectorConsumer(data, vector, fieldVector -> fieldVector.setNull(index), + (theData, fieldVector) -> fieldVector.setSafe(index, theData)); + } + + private static void saveToVector(final byte[] data, final VarBinaryVector vector, final int index) { + preconditionCheckSaveToVector(vector, index); + vectorConsumer(data, vector, fieldVector -> fieldVector.setNull(index), + (theData, fieldVector) -> fieldVector.setSafe(index, theData)); + } + + private static void preconditionCheckSaveToVector(final FieldVector vector, final int index) { + Objects.requireNonNull(vector, "vector cannot be null."); + checkState(index >= 0, "Index must be a positive number!"); + } + + private static <T, V extends FieldVector> void vectorConsumer(final T data, final V vector, + final Consumer<V> consumerIfNullable, + final BiConsumer<T, V> defaultConsumer) { + if (isNull(data)) { + consumerIfNullable.accept(vector); + return; + } + defaultConsumer.accept(data, vector); + } + + private static VectorSchemaRoot getSchemasRoot(final ResultSet data, final BufferAllocator allocator) + throws SQLException { + final VarCharVector catalogs = new VarCharVector("catalog_name", allocator); + final VarCharVector schemas = new VarCharVector("schema_name", allocator); + final List<FieldVector> vectors = ImmutableList.of(catalogs, schemas); + vectors.forEach(FieldVector::allocateNew); + final Map<FieldVector, String> vectorToColumnName = ImmutableMap.of( + catalogs, "TABLE_CATALOG", + schemas, "TABLE_SCHEM"); + saveToVectors(vectorToColumnName, data); + final int rows = vectors.stream().map(FieldVector::getValueCount).findAny().orElseThrow(IllegalStateException::new); + vectors.forEach(vector -> vector.setValueCount(rows)); + return new VectorSchemaRoot(vectors); + } + + private static <T extends FieldVector> int saveToVectors(final Map<T, String> vectorToColumnName, + final ResultSet data, boolean emptyToNull) + throws SQLException { + Objects.requireNonNull(vectorToColumnName, "vectorToColumnName cannot be null."); + Objects.requireNonNull(data, "data cannot be null."); + final Set<Entry<T, String>> entrySet = vectorToColumnName.entrySet(); + int rows = 0; + for (; data.next(); rows++) { + for (final Entry<T, String> vectorToColumn : entrySet) { + final T vector = vectorToColumn.getKey(); + final String columnName = vectorToColumn.getValue(); + if (vector instanceof VarCharVector) { + String thisData = data.getString(columnName); + saveToVector(emptyToNull ? emptyToNull(thisData) : thisData, (VarCharVector) vector, rows); + continue; + } else if (vector instanceof IntVector) { + final int intValue = data.getInt(columnName); + saveToVector(data.wasNull() ? null : intValue, (IntVector) vector, rows); + continue; + } else if (vector instanceof UInt1Vector) { + final byte byteValue = data.getByte(columnName); + saveToVector(data.wasNull() ? null : byteValue, (UInt1Vector) vector, rows); + continue; + } + throw CallStatus.INVALID_ARGUMENT.withDescription("Provided vector not supported").toRuntimeException(); + } + } + for (final Entry<T, String> vectorToColumn : entrySet) { + vectorToColumn.getKey().setValueCount(rows); + } + + return rows; + } + + private static <T extends FieldVector> void saveToVectors(final Map<T, String> vectorToColumnName, + final ResultSet data) + throws SQLException { + saveToVectors(vectorToColumnName, data, false); + } + + private static VectorSchemaRoot getTableTypesRoot(final ResultSet data, final BufferAllocator allocator) + throws SQLException { + return getRoot(data, allocator, "table_type", "TABLE_TYPE"); + } + + private static VectorSchemaRoot getCatalogsRoot(final ResultSet data, final BufferAllocator allocator) + throws SQLException { + return getRoot(data, allocator, "catalog_name", "TABLE_CATALOG"); + } + + private static VectorSchemaRoot getRoot(final ResultSet data, final BufferAllocator allocator, + final String fieldVectorName, final String columnName) + throws SQLException { + final VarCharVector dataVector = new VarCharVector(fieldVectorName, allocator); + saveToVectors(ImmutableMap.of(dataVector, columnName), data); + final int rows = dataVector.getValueCount(); + dataVector.setValueCount(rows); + return new VectorSchemaRoot(singletonList(dataVector)); + } + + private static VectorSchemaRoot getTablesRoot(final DatabaseMetaData databaseMetaData, + final BufferAllocator allocator, + final boolean includeSchema, + final String catalog, + final String schemaFilterPattern, + final String tableFilterPattern, + final String... tableTypes) + throws SQLException, IOException { + /* + * TODO Fix DerbyDB inconsistency if possible. + * During the early development of this prototype, an inconsistency has been found in the database + * used for this demonstration; as DerbyDB does not operate with the concept of catalogs, fetching + * the catalog name for a given table from `DatabaseMetadata#getColumns` and `DatabaseMetadata#getSchemas` + * returns null, as expected. However, the inconsistency lies in the fact that accessing the same + * information -- that is, the catalog name for a given table -- from `DatabaseMetadata#getSchemas` + * returns an empty String.The temporary workaround for this was making sure we convert the empty Strings + * to null using `com.google.common.base.Strings#emptyToNull`. + */ + Objects.requireNonNull(allocator, "BufferAllocator cannot be null."); + final VarCharVector catalogNameVector = new VarCharVector("catalog_name", allocator); + final VarCharVector schemaNameVector = new VarCharVector("schema_name", allocator); + final VarCharVector tableNameVector = new VarCharVector("table_name", allocator); + final VarCharVector tableTypeVector = new VarCharVector("table_type", allocator); + + final List<FieldVector> vectors = + new ArrayList<>( Review comment: In this particular method - yes. If `includeSchema == true` then the VectorSchemaRoot will have an additional `table_schema` vector -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
