paul-rogers commented on a change in pull request #2332: URL: https://github.com/apache/drill/pull/2332#discussion_r757120210
########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.util.List; +import java.util.Objects; + +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.PhysicalOperatorSetupException; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("phoenix-scan") +public class PhoenixGroupScan extends AbstractGroupScan { + + private final String sql; + private final List<SchemaPath> columns; + private final PhoenixScanSpec scanSpec; + private final double rows; + private final ScanStats scanStats; + private final PhoenixStoragePlugin plugin; + + private int hashCode; + + @JsonCreator + public PhoenixGroupScan( + @JsonProperty("sql") String sql, + @JsonProperty("columns") List<SchemaPath> columns, + @JsonProperty("scanSpec") PhoenixScanSpec scanSpec, + @JsonProperty("rows") double rows, + @JsonProperty("config") PhoenixStoragePluginConfig config, + @JacksonInject StoragePluginRegistry plugins) { + super("no-user"); + this.sql = sql; + this.columns = columns; + this.scanSpec = scanSpec; + this.rows = rows; + this.scanStats = computeScanStats(); + this.plugin = plugins.resolve(config, PhoenixStoragePlugin.class); + } + + public PhoenixGroupScan(PhoenixScanSpec scanSpec, PhoenixStoragePlugin plugin) { + super("no-user"); + this.sql = scanSpec.getSql(); + this.columns = ALL_COLUMNS; + this.scanSpec = scanSpec; + this.rows = 100; Review comment: The row count is used, I believe, to plan joins. Is 100 a good estimate? A number this lows suggests to Drill that it can ship the results to all nodes as part of a broadcast join. If the actual number of rows is 1M or 100M, that will have turned out to be a poor choice. No good solution here: you actually don't know the number of rows... ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixScanBatchCreator.java ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.util.List; + +import org.apache.drill.common.exceptions.ChildErrorContext; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ops.ExecutorFragmentContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder; +import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.server.options.OptionManager; + +public class PhoenixScanBatchCreator implements BatchCreator<PhoenixSubScan> { + + @Override + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, PhoenixSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { + try { + ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan); + return builder.buildScanOperator(context, subScan); + } catch (UserException e) { + throw e; + } catch (Throwable e) { + throw new ExecutionSetupException(e); + } + } + + private ScanFrameworkBuilder createBuilder(OptionManager options, PhoenixSubScan subScan) { + ScanFrameworkBuilder builder = new ScanFrameworkBuilder(); + builder.projection(subScan.getColumns()); + builder.setUserName(subScan.getUserName()); + // Phoenix reader + ReaderFactory readerFactory = new PhoenixReaderFactory(subScan); + builder.setReaderFactory(readerFactory); + builder.nullType(Types.optional(MinorType.VARCHAR)); + // Add custom error context + builder.errorContext(new ChildErrorContext(builder.errorContext()) { + @Override + public void addContext(UserException.Builder builder) { + builder.addContext("Database : ", subScan.getScanSpec().getDbName()); + builder.addContext("Table : ", subScan.getScanSpec().getTableName()); + } + }); + + return builder; + } + + private static class PhoenixReaderFactory implements ReaderFactory { + + private final PhoenixSubScan subScan; + private int count = 0; Review comment: Nit: initializer not needed: default is already 0. ########## File path: contrib/storage-phoenix/src/main/resources/bootstrap-storage-plugins.json ########## @@ -0,0 +1,14 @@ +{ + "storage": { + "phoenix": { + "type": "phoenix", + "jdbcURL": "jdbc:phoenix:thin:url=http://the.queryserver.hostname:8765;serialization=PROTOBUF", Review comment: Should this point to localhost with the default port? To allow the simplest config to work out-of-the-box? ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java ########## @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.math.BigDecimal; +import java.sql.Array; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.accessor.ColumnWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import org.slf4j.LoggerFactory; + +public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> { + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBatchReader.class); + + private final PhoenixSubScan subScan; + private CustomErrorContext errorContext; + private PhoenixReader reader; + private Connection conn; + private PreparedStatement pstmt; + private ResultSet rs; + private ResultSetMetaData meta; + private ColumnDefn[] columns; + private Stopwatch watch; + private int count = 0; + + public PhoenixBatchReader(PhoenixSubScan subScan) { + this.subScan = subScan; + } + + @Override + public boolean open(SchemaNegotiator negotiator) { + try { + errorContext = negotiator.parentErrorContext(); + conn = subScan.getPlugin().getDataSource().getConnection(); + pstmt = conn.prepareStatement(subScan.getSql()); + rs = pstmt.executeQuery(); + meta = pstmt.getMetaData(); + } catch (SQLException e) { + throw UserException + .dataReadError(e) + .message("Failed to execute the phoenix sql query. " + e.getMessage()) + .build(logger); + } + try { + negotiator.tableSchema(defineMetadata(), true); + reader = new PhoenixReader(negotiator.build()); + bindColumns(reader.getStorage()); + } catch (SQLException e) { + throw UserException + .dataReadError(e) + .message("Failed to get type of columns from metadata. " + e.getMessage()) + .build(logger); + } + watch = Stopwatch.createStarted(); + return true; + } + + @Override + public boolean next() { + try { + while (rs.next()) { + { // TODO refactor this to PhoenixReader + reader.getStorage().start(); + for (int index = 0; index < columns.length; index++) { + if (columns[index].getSqlType() == Types.ARRAY) { + Array result = rs.getArray(index + 1); + if (result != null) { + columns[index].load(result.getArray()); + } + } else { + columns[index].load(rs.getObject(index + 1)); + } + } + count++; + reader.getStorage().save(); + } + if (reader.getStorage().isFull()) { // batch full but not reached the EOF + return true; + } + } + } catch (SQLException e) { + throw UserException + .dataReadError(e) + .message("Failed to get the data from the result set. " + e.getMessage()) + .build(logger); + } + watch.stop(); + logger.debug("Phoenix fetch total record numbers : {}", count); + return false; // the EOF is reached. + } + + @Override + public void close() { + count = reader.getStorage().loader().batchCount(); + logger.debug("Phoenix fetch batch size : {}, took {} ms. ", count, watch.elapsed(TimeUnit.MILLISECONDS)); + AutoCloseables.closeSilently(rs, pstmt, conn); + } + + private TupleMetadata defineMetadata() throws SQLException { + List<SchemaPath> cols = subScan.getColumns(); + columns = new ColumnDefn[cols.size()]; + SchemaBuilder builder = new SchemaBuilder(); + for (int index = 0; index < cols.size(); index++) { + int sqlType = meta.getColumnType(index + 1); // column the first column is 1 + String columnName = cols.get(index).rootName(); + columns[index] = makeColumn(columnName, sqlType, meta.getColumnTypeName(index + 1), index); + columns[index].define(builder); + } + return builder.buildSchema(); + } + + private ColumnDefn makeColumn(String name, int sqlType, String baseType, int index) { + if (sqlType == Types.ARRAY) { + return new ArrayDefn(name, sqlType, baseType, index); + } + return new GenericDefn(name, sqlType, index); + } + + private void bindColumns(RowSetLoader loader) { + for (int i = 0; i < columns.length; i++) { + columns[i].bind(loader); + } + } + + protected static final Map<Integer, MinorType> COLUMN_TYPE_MAP = Maps.newHashMap(); + + static { + // text + COLUMN_TYPE_MAP.put(Types.VARCHAR, MinorType.VARCHAR); + COLUMN_TYPE_MAP.put(Types.CHAR, MinorType.VARCHAR); + // numbers + COLUMN_TYPE_MAP.put(Types.BIGINT, MinorType.BIGINT); + COLUMN_TYPE_MAP.put(Types.INTEGER, MinorType.INT); + COLUMN_TYPE_MAP.put(Types.SMALLINT, MinorType.INT); + COLUMN_TYPE_MAP.put(Types.TINYINT, MinorType.INT); + COLUMN_TYPE_MAP.put(Types.DOUBLE, MinorType.FLOAT8); + COLUMN_TYPE_MAP.put(Types.FLOAT, MinorType.FLOAT8); + COLUMN_TYPE_MAP.put(Types.DECIMAL, MinorType.VARDECIMAL); + // time + COLUMN_TYPE_MAP.put(Types.DATE, MinorType.DATE); + COLUMN_TYPE_MAP.put(Types.TIME, MinorType.TIME); + COLUMN_TYPE_MAP.put(Types.TIMESTAMP, MinorType.TIMESTAMP); + // binary + COLUMN_TYPE_MAP.put(Types.BINARY, MinorType.VARBINARY); // Raw fixed length byte array. Mapped to byte[]. + COLUMN_TYPE_MAP.put(Types.VARBINARY, MinorType.VARBINARY); // Raw variable length byte array. + // boolean + COLUMN_TYPE_MAP.put(Types.BOOLEAN, MinorType.BIT); + } + + public abstract static class ColumnDefn { + + final String name; + final int index; + final int sqlType; + ColumnWriter writer; + + public String getName() { + return name; + } + + public int getIndex() { + return index; + } + + public int getSqlType() { + return sqlType; + } + + public ColumnDefn(String name, int sqlType, int index) { + this.name = name; + this.sqlType = sqlType; + this.index = index; + } + + public void define(SchemaBuilder builder) { + builder.addNullable(getName(), COLUMN_TYPE_MAP.get(getSqlType())); + } + + public void bind(RowSetLoader loader) { + writer = loader.scalar(getName()); + } + + public abstract void load(Object value); + } + + public static class GenericDefn extends ColumnDefn { + + public GenericDefn(String name, int sqlType, int index) { + super(name, sqlType, index); + } + + @Override + public void load(Object value) { // TODO refactor this to AbstractScalarWriter + ScalarWriter scalarWriter = (ScalarWriter) writer; + switch (getSqlType()) { + case Types.VARCHAR: + case Types.CHAR: + scalarWriter.setString((String) value); + break; + case Types.BIGINT : + scalarWriter.setLong((Long) value); + break; + case Types.INTEGER : + scalarWriter.setInt((Integer) value); + break; + case Types.SMALLINT : + scalarWriter.setInt((Short) value); + break; + case Types.TINYINT : + scalarWriter.setInt((Byte) value); + break; + case Types.DOUBLE : + case Types.FLOAT : + scalarWriter.setDouble((Double) value); + break; + case Types.DECIMAL : + scalarWriter.setDecimal((BigDecimal) value); + break; + case Types.DATE : + scalarWriter.setDate(((Date) value).toLocalDate()); + break; + case Types.TIME : + scalarWriter.setTime(((Time) value).toLocalTime()); + break; + case Types.TIMESTAMP : + scalarWriter.setTimestamp(((Timestamp) value).toInstant()); + break; + case Types.BINARY : + case Types.VARBINARY : + byte[] byteValue = (byte[]) value; + scalarWriter.setBytes(byteValue, byteValue.length); + break; + case Types.BOOLEAN : + scalarWriter.setBoolean((Boolean) value); + break; + default: + break; + } + } + } + + public static class ArrayDefn extends ColumnDefn { + + final String VARCHAR = "VARCHAR ARRAY"; + final String CHAR = "CHAR ARRAY"; + final String BIGINT = "BIGINT ARRAY"; + final String INTEGER = "INTEGER ARRAY"; + final String DOUBLE = "DOUBLE ARRAY"; + final String FLOAT = "FLOAT ARRAY"; + final String SMALLINT = "SMALLINT ARRAY"; + final String TINYINT = "TINYINT ARRAY"; + final String BOOLEAN = "BOOLEAN ARRAY"; + + final String baseType; + + public ArrayDefn(String name, int sqlType, String baseType, int index) { + super(name, sqlType, index); + this.baseType = baseType; + } + + @Override + public void define(SchemaBuilder builder) { + switch (baseType) { + case VARCHAR: + case CHAR: + builder.addArray(getName(), MinorType.VARCHAR); + break; + case BIGINT: + builder.addArray(getName(), MinorType.BIGINT); + break; + case INTEGER: + builder.addArray(getName(), MinorType.INT); + break; + case DOUBLE: + case FLOAT: + builder.addArray(getName(), MinorType.FLOAT8); + break; + case SMALLINT: + builder.addArray(getName(), MinorType.SMALLINT); + break; + case TINYINT: + builder.addArray(getName(), MinorType.TINYINT); + break; + case BOOLEAN: + builder.addArray(getName(), MinorType.BIT); + break; + default: + break; + } + } + + @Override + public void bind(RowSetLoader loader) { + writer = loader.array(getName()); + } + + @Override + public void load(Object value) { Review comment: As above: try to avoid a switch in the inner-most loop. ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixGroupScan.java ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.util.List; +import java.util.Objects; + +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.PhysicalOperatorSetupException; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("phoenix-scan") +public class PhoenixGroupScan extends AbstractGroupScan { + + private final String sql; + private final List<SchemaPath> columns; + private final PhoenixScanSpec scanSpec; + private final double rows; + private final ScanStats scanStats; + private final PhoenixStoragePlugin plugin; + + private int hashCode; + + @JsonCreator + public PhoenixGroupScan( + @JsonProperty("sql") String sql, + @JsonProperty("columns") List<SchemaPath> columns, + @JsonProperty("scanSpec") PhoenixScanSpec scanSpec, + @JsonProperty("rows") double rows, + @JsonProperty("config") PhoenixStoragePluginConfig config, + @JacksonInject StoragePluginRegistry plugins) { + super("no-user"); + this.sql = sql; + this.columns = columns; + this.scanSpec = scanSpec; + this.rows = rows; + this.scanStats = computeScanStats(); + this.plugin = plugins.resolve(config, PhoenixStoragePlugin.class); + } + + public PhoenixGroupScan(PhoenixScanSpec scanSpec, PhoenixStoragePlugin plugin) { + super("no-user"); + this.sql = scanSpec.getSql(); + this.columns = ALL_COLUMNS; + this.scanSpec = scanSpec; + this.rows = 100; + this.scanStats = computeScanStats(); + this.plugin = plugin; + } + + public PhoenixGroupScan(PhoenixGroupScan scan) { + super(scan); + this.sql = scan.sql; + this.columns = scan.columns; + this.scanSpec = scan.scanSpec; + this.rows = scan.rows; + this.scanStats = scan.scanStats; + this.plugin = scan.plugin; + } + + public PhoenixGroupScan(PhoenixGroupScan scan, List<SchemaPath> columns) { + super(scan); + this.sql = scan.sql; + this.columns = columns; + this.scanSpec = scan.scanSpec; + this.rows = scan.rows; + this.scanStats = scan.scanStats; + this.plugin = scan.plugin; + } + + public PhoenixGroupScan(String sql, List<SchemaPath> columns, PhoenixScanSpec scanSpec, double rows, PhoenixStoragePlugin plugin) { + super("no-user"); + this.sql = sql; + this.columns = columns; + this.scanSpec = scanSpec; + this.rows = rows; + this.scanStats = computeScanStats(); + this.plugin = plugin; + } + + @JsonProperty("sql") + public String sql() { + return sql; + } + + @JsonProperty("columns") + public List<SchemaPath> columns() { + return columns; + } + + @JsonProperty("scanSpec") + public PhoenixScanSpec scanSpec() { + return scanSpec; + } + + @JsonProperty("rows") + public double rows() { + return rows; + } + + @JsonProperty("scanStats") + public ScanStats scanStats() { + return scanStats; + } + + @JsonIgnore + public PhoenixStoragePlugin plugin() { + return plugin; + } + + @JsonProperty("config") + public StoragePluginConfig config() { + return plugin.getConfig(); + } + + @Override + public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException { } + + @Override + public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException { + return new PhoenixSubScan(sql, columns, scanSpec, plugin); + } + + @Override + public int getMaxParallelizationWidth() { + return 1; + } + + @Override + public String getDigest() { + return toString(); + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { + return new PhoenixGroupScan(this); + } + + @Override + public ScanStats getScanStats() { + return scanStats; + } + + @Override + public GroupScan clone(List<SchemaPath> columns) { + return new PhoenixGroupScan(this, columns); + } + + @Override + public int hashCode() { + if (hashCode == 0) { + hashCode = Objects.hash(sql, columns, scanSpec, rows, plugin.getConfig()); + } + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if(this == obj) { Review comment: Nit: insert space after `if` here and below. ########## File path: contrib/storage-phoenix/src/main/resources/bootstrap-storage-plugins.json ########## @@ -0,0 +1,14 @@ +{ + "storage": { + "phoenix": { + "type": "phoenix", + "jdbcURL": "jdbc:phoenix:thin:url=http://the.queryserver.hostname:8765;serialization=PROTOBUF", + "username": "drill", + "password": "drill", Review comment: Is this the default out-of-the-box Phoenix password? Probably not. Maybe we should use the default one so a simple install & run works without fiddling? ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixPrel.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix.rules; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.adapter.jdbc.JdbcImplementor; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.AbstractRelNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.sql.SqlDialect; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.physical.PhysicalPlanCreator; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.store.SubsetRemover; +import org.apache.drill.exec.store.phoenix.PhoenixGroupScan; + +public class PhoenixPrel extends AbstractRelNode implements Prel { + + private final String sql; + private final double rows; + private final PhoenixConvention convention; + + public PhoenixPrel(RelOptCluster cluster, RelTraitSet traitSet, PhoenixIntermediatePrel prel) { + super(cluster, traitSet); + final RelNode input = prel.getInput(); + rows = input.estimateRowCount(cluster.getMetadataQuery()); + convention = (PhoenixConvention) input.getTraitSet().getTrait(ConventionTraitDef.INSTANCE); + final SqlDialect dialect = convention.getPlugin().getDialect(); + final JdbcImplementor jdbcImplementor = new PhoenixImplementor(dialect, (JavaTypeFactory) getCluster().getTypeFactory()); + final JdbcImplementor.Result result = jdbcImplementor.visitChild(0, input.accept(SubsetRemover.INSTANCE)); + sql = result.asStatement().toSqlString(dialect).getSql(); + rowType = input.getRowType(); + } + + @Override + public Iterator<Prel> iterator() { + return Collections.emptyIterator(); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) + throws IOException { + List<SchemaPath> columns = new ArrayList<SchemaPath>(); + for (String col : rowType.getFieldNames()) { + columns.add(SchemaPath.getSimplePath(col)); + } + PhoenixGroupScan output = new PhoenixGroupScan(sql, columns, null, rows, convention.getPlugin()); + return creator.addMetadata(this, output); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw).item("sql", stripToOneLineSql(sql)); + } + + private String stripToOneLineSql(String sql) { + StringBuilder sbt = new StringBuilder(sql.length()); + String[] sqlToken = sql.split("\\n"); + for (String sqlText : sqlToken) { + if (!sqlText.trim().startsWith("--")) { Review comment: Sadly, the following is also legal (in most SQL): ``` SELECT a, -- first comment b, -- second comment FROM ... ``` The above code will produce: ```sql SELECT a, --first comment b -- second comment FROM ... ``` This leads to a question: why convert to a single line? Is this to convert SQL to Phoenix? Can't Phoenix parse newlines? (This would be very odd as the SQL standard says newlines are fine whitespace.) If you do have to remove newlines, you may need to have a mini-parser to handle comments and newlines. [This page](https://phoenix.apache.org/language/index.html#comments) suggests that Phoenix can handle comments and newlines. ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixBatchReader.java ########## @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.math.BigDecimal; +import java.sql.Array; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.accessor.ColumnWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import org.slf4j.LoggerFactory; + +public class PhoenixBatchReader implements ManagedReader<SchemaNegotiator> { + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PhoenixBatchReader.class); + + private final PhoenixSubScan subScan; + private CustomErrorContext errorContext; + private PhoenixReader reader; + private Connection conn; + private PreparedStatement pstmt; + private ResultSet rs; + private ResultSetMetaData meta; + private ColumnDefn[] columns; + private Stopwatch watch; + private int count = 0; + + public PhoenixBatchReader(PhoenixSubScan subScan) { + this.subScan = subScan; + } + + @Override + public boolean open(SchemaNegotiator negotiator) { + try { + errorContext = negotiator.parentErrorContext(); + conn = subScan.getPlugin().getDataSource().getConnection(); + pstmt = conn.prepareStatement(subScan.getSql()); + rs = pstmt.executeQuery(); + meta = pstmt.getMetaData(); + } catch (SQLException e) { + throw UserException + .dataReadError(e) + .message("Failed to execute the phoenix sql query. " + e.getMessage()) + .build(logger); + } + try { + negotiator.tableSchema(defineMetadata(), true); + reader = new PhoenixReader(negotiator.build()); + bindColumns(reader.getStorage()); + } catch (SQLException e) { + throw UserException + .dataReadError(e) + .message("Failed to get type of columns from metadata. " + e.getMessage()) + .build(logger); + } + watch = Stopwatch.createStarted(); + return true; + } + + @Override + public boolean next() { + try { + while (rs.next()) { + { // TODO refactor this to PhoenixReader + reader.getStorage().start(); + for (int index = 0; index < columns.length; index++) { + if (columns[index].getSqlType() == Types.ARRAY) { + Array result = rs.getArray(index + 1); + if (result != null) { + columns[index].load(result.getArray()); + } + } else { + columns[index].load(rs.getObject(index + 1)); + } + } + count++; + reader.getStorage().save(); + } + if (reader.getStorage().isFull()) { // batch full but not reached the EOF + return true; + } + } + } catch (SQLException e) { + throw UserException + .dataReadError(e) + .message("Failed to get the data from the result set. " + e.getMessage()) + .build(logger); + } + watch.stop(); + logger.debug("Phoenix fetch total record numbers : {}", count); + return false; // the EOF is reached. + } + + @Override + public void close() { + count = reader.getStorage().loader().batchCount(); + logger.debug("Phoenix fetch batch size : {}, took {} ms. ", count, watch.elapsed(TimeUnit.MILLISECONDS)); + AutoCloseables.closeSilently(rs, pstmt, conn); + } + + private TupleMetadata defineMetadata() throws SQLException { + List<SchemaPath> cols = subScan.getColumns(); + columns = new ColumnDefn[cols.size()]; + SchemaBuilder builder = new SchemaBuilder(); + for (int index = 0; index < cols.size(); index++) { + int sqlType = meta.getColumnType(index + 1); // column the first column is 1 + String columnName = cols.get(index).rootName(); + columns[index] = makeColumn(columnName, sqlType, meta.getColumnTypeName(index + 1), index); + columns[index].define(builder); + } + return builder.buildSchema(); + } + + private ColumnDefn makeColumn(String name, int sqlType, String baseType, int index) { + if (sqlType == Types.ARRAY) { + return new ArrayDefn(name, sqlType, baseType, index); + } + return new GenericDefn(name, sqlType, index); + } + + private void bindColumns(RowSetLoader loader) { + for (int i = 0; i < columns.length; i++) { + columns[i].bind(loader); + } + } + + protected static final Map<Integer, MinorType> COLUMN_TYPE_MAP = Maps.newHashMap(); + + static { + // text + COLUMN_TYPE_MAP.put(Types.VARCHAR, MinorType.VARCHAR); + COLUMN_TYPE_MAP.put(Types.CHAR, MinorType.VARCHAR); + // numbers + COLUMN_TYPE_MAP.put(Types.BIGINT, MinorType.BIGINT); + COLUMN_TYPE_MAP.put(Types.INTEGER, MinorType.INT); + COLUMN_TYPE_MAP.put(Types.SMALLINT, MinorType.INT); + COLUMN_TYPE_MAP.put(Types.TINYINT, MinorType.INT); + COLUMN_TYPE_MAP.put(Types.DOUBLE, MinorType.FLOAT8); + COLUMN_TYPE_MAP.put(Types.FLOAT, MinorType.FLOAT8); + COLUMN_TYPE_MAP.put(Types.DECIMAL, MinorType.VARDECIMAL); + // time + COLUMN_TYPE_MAP.put(Types.DATE, MinorType.DATE); + COLUMN_TYPE_MAP.put(Types.TIME, MinorType.TIME); + COLUMN_TYPE_MAP.put(Types.TIMESTAMP, MinorType.TIMESTAMP); + // binary + COLUMN_TYPE_MAP.put(Types.BINARY, MinorType.VARBINARY); // Raw fixed length byte array. Mapped to byte[]. + COLUMN_TYPE_MAP.put(Types.VARBINARY, MinorType.VARBINARY); // Raw variable length byte array. + // boolean + COLUMN_TYPE_MAP.put(Types.BOOLEAN, MinorType.BIT); + } + + public abstract static class ColumnDefn { + + final String name; + final int index; + final int sqlType; + ColumnWriter writer; + + public String getName() { + return name; + } + + public int getIndex() { + return index; + } + + public int getSqlType() { + return sqlType; + } + + public ColumnDefn(String name, int sqlType, int index) { + this.name = name; + this.sqlType = sqlType; + this.index = index; + } + + public void define(SchemaBuilder builder) { + builder.addNullable(getName(), COLUMN_TYPE_MAP.get(getSqlType())); + } + + public void bind(RowSetLoader loader) { + writer = loader.scalar(getName()); + } + + public abstract void load(Object value); + } + + public static class GenericDefn extends ColumnDefn { + + public GenericDefn(String name, int sqlType, int index) { + super(name, sqlType, index); + } + + @Override + public void load(Object value) { // TODO refactor this to AbstractScalarWriter + ScalarWriter scalarWriter = (ScalarWriter) writer; + switch (getSqlType()) { Review comment: Suggestion: we're in the innermost loop here: reading every column for every row. The preferred way to handle multiple types is with a class (or lambda) that we can jump to directly without the indirection of a switch. In existing code, we tend to create a class per type. Might be fun to try the more modern approach a lambda that implements `Consumer`. The result is that the virtual function call to `load()` gets you directly to the code that sets the value: no need for a per-column switch statement. ########## File path: exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java ########## @@ -174,6 +174,14 @@ public void setObject(Object array) { setLongObjectArray((Long[]) array); } else if (memberClassName.equals(Double.class.getName())) { setDoubleObjectArray((Double[]) array); + } else if (memberClassName.equals(Float.class.getName())) { + setFloatObjectArray((Float[]) array); + } else if (memberClassName.equals(Short.class.getName())) { + setShortObjectArray((Short[]) array); + } else if (memberClassName.equals(Byte.class.getName())) { + setByteObjectArray((Byte[]) array); + } else if (memberClassName.equals(Boolean.class.getName())) { + setBooleanObjectArray((Boolean[]) array); Review comment: Thanks for adding these missing types! Please add unit tests for these changes. I believe there are tests which exercise this code; you can just add new cases for the new types. ########## File path: exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java ########## @@ -201,12 +220,34 @@ public void setByteArray(byte[] value) { } } + public void setByteObjectArray(Byte[] value) { + for (int i = 0; i < value.length; i++) { + final Byte element = value[i]; + if (element == null) { + elementWriter.setNull(); + } else { + elementWriter.setInt(element); + } + } + } + public void setShortArray(short[] value) { for (int i = 0; i < value.length; i++) { elementWriter.setInt(value[i]); } } + public void setShortObjectArray(Short[] value) { + for (int i = 0; i < value.length; i++) { + final Short element = value[i]; + if (element == null) { + elementWriter.setNull(); Review comment: Was this tested? As it turns out, Drill does not allow individual array elements to be null, so I believe this call will throw an exception. Drill only allows the entire array to be empty (which Drill treats equivalent to NULL, IIRC.) One way to handle nulls is to replace them with a default value. Since that default value is likely to be datasource-specific, the conversion should be done by the caller. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
