[ https://issues.apache.org/jira/browse/DRILL-7863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450247#comment-17450247 ]
ASF GitHub Bot commented on DRILL-7863: --------------------------------------- paul-rogers commented on a change in pull request #2332: URL: https://github.com/apache/drill/pull/2332#discussion_r758097799 ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixReader.java ########## @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.math.BigDecimal; +import java.sql.Array; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.Arrays; +import java.util.Map; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.vector.accessor.ColumnWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; + +public class PhoenixReader { + + private final RowSetLoader writer; + private final ColumnDefn[] columns; + private final ResultSet results; + private long count; + + public PhoenixReader(ResultSetLoader loader, ColumnDefn[] columns, ResultSet results) { + this.writer = loader.writer(); + this.columns = columns; + this.results = results; + } + + public RowSetLoader getStorage() { + return writer; + } + + public long getCount() { + return count; + } + + /** + * Fetch and process one row. + * @return return true if one row is processed, return false if there is no next row. + * @throws SQLException + */ + public boolean processRow() throws SQLException { + if (results.next()) { + writer.start(); + for (int index = 0; index < columns.length; index++) { + if (columns[index].getSqlType() == Types.ARRAY) { + Array result = results.getArray(index + 1); + if (result != null) { + columns[index].load(result.getArray()); + } + } else { + Object result = results.getObject(index + 1); + if (result != null) { + columns[index].load(result); + } + } + } + count++; + writer.save(); + return true; + } + return false; + } + + protected static final Map<Integer, MinorType> COLUMN_TYPE_MAP = Maps.newHashMap(); + + static { + // text + COLUMN_TYPE_MAP.put(Types.VARCHAR, MinorType.VARCHAR); + COLUMN_TYPE_MAP.put(Types.CHAR, MinorType.VARCHAR); + // numbers + COLUMN_TYPE_MAP.put(Types.BIGINT, MinorType.BIGINT); + COLUMN_TYPE_MAP.put(Types.INTEGER, MinorType.INT); + COLUMN_TYPE_MAP.put(Types.SMALLINT, MinorType.INT); + COLUMN_TYPE_MAP.put(Types.TINYINT, MinorType.INT); + COLUMN_TYPE_MAP.put(Types.DOUBLE, MinorType.FLOAT8); + COLUMN_TYPE_MAP.put(Types.FLOAT, MinorType.FLOAT4); + COLUMN_TYPE_MAP.put(Types.DECIMAL, MinorType.VARDECIMAL); + // time + COLUMN_TYPE_MAP.put(Types.DATE, MinorType.DATE); + COLUMN_TYPE_MAP.put(Types.TIME, MinorType.TIME); + COLUMN_TYPE_MAP.put(Types.TIMESTAMP, MinorType.TIMESTAMP); + // binary + COLUMN_TYPE_MAP.put(Types.BINARY, MinorType.VARBINARY); // Raw fixed length byte array. Mapped to byte[]. + COLUMN_TYPE_MAP.put(Types.VARBINARY, MinorType.VARBINARY); // Raw variable length byte array. + // boolean + COLUMN_TYPE_MAP.put(Types.BOOLEAN, MinorType.BIT); + } + + protected abstract static class ColumnDefn { + + final String name; + final int index; + final int sqlType; + ColumnWriter writer; + + public String getName() { + return name; + } + + public int getIndex() { + return index; + } + + public int getSqlType() { + return sqlType; + } + + public ColumnDefn(String name, int index, int sqlType) { + this.name = name; + this.index = index; + this.sqlType = sqlType; + } + + public void define(SchemaBuilder builder) { + builder.addNullable(getName(), COLUMN_TYPE_MAP.get(getSqlType())); + } + + public void bind(RowSetLoader loader) { + writer = loader.scalar(getName()); + } + + public abstract void load(Object value); + } + + protected static abstract class GenericDefn extends ColumnDefn { + + public GenericDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + } + + protected static class GenericVarcharDefn extends GenericDefn { + + public GenericVarcharDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setString((String) value); + } + } + + protected static class GenericBigintDefn extends GenericDefn { + + public GenericBigintDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setLong((Long) value); + } + } + + protected static class GenericIntegerDefn extends GenericDefn { + + public GenericIntegerDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setInt((Integer) value); + } + } + + protected static class GenericSmallintDefn extends GenericDefn { + + public GenericSmallintDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setInt((Short) value); + } + } + + protected static class GenericTinyintDefn extends GenericDefn { + + public GenericTinyintDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setInt((Byte) value); + } + } + + protected static class GenericDoubleDefn extends GenericDefn { + + public GenericDoubleDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setDouble((Double) value); + } + } + + protected static class GenericFloatDefn extends GenericDefn { + + public GenericFloatDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setFloat((Float) value); Review comment: Two minor suggestions. First, maybe have an abstract scalar class to avoid the repeated casts. The scalar abstract class holds the `ScalarWriter` while the array version holds the `ScalarArrayWriter`. Second, for scalars, it is fine to call `setObject()` which will do the cast for you. The scalar writers don't do any of the "object parsing" logic that arrays do: they expect the object to be of the correct type. What that, you'd have: ```java writer.setObject(value) ``` Because of that, you can have one class for the types that don't need conversion (which seems to be Float, Double, String, etc.) since the implementation would be the same for all of them. ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixReader.java ########## @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.math.BigDecimal; +import java.sql.Array; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.Arrays; +import java.util.Map; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.vector.accessor.ColumnWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; + +public class PhoenixReader { + + private final RowSetLoader writer; + private final ColumnDefn[] columns; + private final ResultSet results; + private long count; + + public PhoenixReader(ResultSetLoader loader, ColumnDefn[] columns, ResultSet results) { + this.writer = loader.writer(); + this.columns = columns; + this.results = results; + } + + public RowSetLoader getStorage() { + return writer; + } + + public long getCount() { + return count; + } + + /** + * Fetch and process one row. + * @return return true if one row is processed, return false if there is no next row. + * @throws SQLException + */ + public boolean processRow() throws SQLException { + if (results.next()) { + writer.start(); + for (int index = 0; index < columns.length; index++) { + if (columns[index].getSqlType() == Types.ARRAY) { + Array result = results.getArray(index + 1); + if (result != null) { + columns[index].load(result.getArray()); + } + } else { + Object result = results.getObject(index + 1); + if (result != null) { + columns[index].load(result); + } + } + } + count++; Review comment: The result set loader maintains row and batch counts for you, if you need them. ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +import org.apache.commons.lang3.StringUtils; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig; +import org.apache.drill.common.logical.security.CredentialsProvider; +import org.apache.drill.exec.store.security.CredentialProviderUtils; +import org.apache.drill.exec.store.security.UsernamePasswordCredentials; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName(PhoenixStoragePluginConfig.NAME) +public class PhoenixStoragePluginConfig extends AbstractSecuredStoragePluginConfig { + + public static final String NAME = "phoenix"; + public static final String THIN_DRIVER_CLASS = "org.apache.phoenix.queryserver.client.Driver"; + public static final String FAT_DRIVER_CLASS = "org.apache.phoenix.jdbc.PhoenixDriver"; + + private final String host; + private final int port; + private final String jdbcURL; // (options) Equal to host + port + private final Map<String, Object> props; // (options) See also http://phoenix.apache.org/tuning.html + + @JsonCreator + public PhoenixStoragePluginConfig( + @JsonProperty("host") String host, + @JsonProperty("port") int port, + @JsonProperty("username") String username, + @JsonProperty("password") String password, + @JsonProperty("jdbcURL") String jdbcURL, + @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider, + @JsonProperty("props") Map<String, Object> props) { + super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider), credentialsProvider == null); + this.host = host; + this.port = port == 0 ? 8765 : port; + this.jdbcURL = jdbcURL; + this.props = props == null ? Collections.emptyMap() : props; + } + + @JsonIgnore + public UsernamePasswordCredentials getUsernamePasswordCredentials() { + return new UsernamePasswordCredentials(credentialsProvider); + } + + @JsonProperty("host") + public String getHost() { + return host; + } + + @JsonProperty("port") + public int getPort() { + return port; + } + + @JsonProperty("username") + public String getUsername() { + if (directCredentials) { + return getUsernamePasswordCredentials().getUsername(); + } + return null; + } + + @JsonIgnore + @JsonProperty("password") + public String getPassword() { + if (directCredentials) { + return getUsernamePasswordCredentials().getPassword(); Review comment: This can probably just be `getCredentials()` since there are no credentials here other than user name/password. ########## File path: exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java ########## @@ -201,12 +218,30 @@ public void setByteArray(byte[] value) { } } + public void setByteObjectArray(Byte[] value) { + for (int i = 0; i < value.length; i++) { + final Byte element = value[i]; + if (element != null) { + elementWriter.setInt(element); + } + } + } + public void setShortArray(short[] value) { for (int i = 0; i < value.length; i++) { elementWriter.setInt(value[i]); } } + public void setShortObjectArray(Short[] value) { + for (int i = 0; i < value.length; i++) { + final Short element = value[i]; + if (element != null) { + elementWriter.setInt(element); Review comment: Thanks for fixing this. While you're at it, can you fix the existing `setFooObjectArray()` methods that call `setNull()`? Not sure how they got into the code... ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixReader.java ########## @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.math.BigDecimal; +import java.sql.Array; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.Arrays; +import java.util.Map; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.vector.accessor.ColumnWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; + +public class PhoenixReader { + + private final RowSetLoader writer; + private final ColumnDefn[] columns; + private final ResultSet results; + private long count; + + public PhoenixReader(ResultSetLoader loader, ColumnDefn[] columns, ResultSet results) { + this.writer = loader.writer(); + this.columns = columns; + this.results = results; + } + + public RowSetLoader getStorage() { + return writer; + } + + public long getCount() { + return count; + } + + /** + * Fetch and process one row. + * @return return true if one row is processed, return false if there is no next row. + * @throws SQLException + */ + public boolean processRow() throws SQLException { + if (results.next()) { + writer.start(); + for (int index = 0; index < columns.length; index++) { + if (columns[index].getSqlType() == Types.ARRAY) { Review comment: Looking better. Even this if-statement can be removed. Each column should know its index. Then, for the generic scalars (see comment below) you could write: ```java public void load(Something results) { scalar.setObject(results.getObject(index)) } ``` Similar for the array load. Note that the stored index would be for the `result` object to avoid the `+ 1` for every column. Your loop would then look like this: ```java for (Something column : columns) { column.load(results); } ``` Code can't get much simpler than that. ########## File path: contrib/storage-phoenix/src/test/resources/hbase-site.xml ########## @@ -0,0 +1,31 @@ +<?xml version="1.0" encoding="UTF-8"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<configuration> + <property> + <name>hbase.master.start.timeout.localHBaseCluster</name> + <value>60000</value> + </property> + <property> + <name>phoenix.schema.isNamespaceMappingEnabled</name> + <value>true</value> + </property> +</configuration> Review comment: Nit: missing final newline. ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.io.IOException; +import java.util.Set; + +import javax.sql.DataSource; + +import org.apache.calcite.adapter.jdbc.JdbcSchema; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlDialectFactoryImpl; +import org.apache.commons.lang3.StringUtils; +import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.ops.OptimizerRulesContext; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.AbstractStoragePlugin; +import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.store.phoenix.rules.PhoenixConvention; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; + +import com.fasterxml.jackson.core.type.TypeReference; + +public class PhoenixStoragePlugin extends AbstractStoragePlugin { + + private final PhoenixStoragePluginConfig config; + private final DataSource dataSource; + private final SqlDialect dialect; + private final PhoenixConvention convention; + private final PhoenixSchemaFactory schemaFactory; + + public PhoenixStoragePlugin(PhoenixStoragePluginConfig config, DrillbitContext context, String name) { + super(context, name); + this.config = config; + this.dataSource = initNoPoolingDataSource(config); + this.dialect = JdbcSchema.createDialect(SqlDialectFactoryImpl.INSTANCE, dataSource); + this.convention = new PhoenixConvention(dialect, name, this); + this.schemaFactory = new PhoenixSchemaFactory(this); + } + + @Override + public StoragePluginConfig getConfig() { + return config; + } + + public DataSource getDataSource() { + return dataSource; + } + + public SqlDialect getDialect() { + return dialect; + } + + public PhoenixConvention getConvention() { + return convention; + } + + @Override + public boolean supportsRead() { + return true; + } + + @Override + public Set<? extends RelOptRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) { + return convention.getRules(); + } + + @Override + public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { + schemaFactory.registerSchemas(schemaConfig, parent); + } + + @Override + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException { + PhoenixScanSpec scanSpec = selection.getListWith(context.getLpPersistence().getMapper(), new TypeReference<PhoenixScanSpec>() {}); + return new PhoenixGroupScan(scanSpec, this); + } + + private static DataSource initNoPoolingDataSource(PhoenixStoragePluginConfig config) { + // Don't use the pool with the connection + PhoenixDataSource dataSource = null; + if (StringUtils.isNotBlank(config.getJdbcURL())) { + if (!config.getProps().isEmpty()) { + dataSource = new PhoenixDataSource(config.getJdbcURL(), config.getProps()); + } else { + dataSource = new PhoenixDataSource(config.getJdbcURL()); + } + } else { + if (!config.getProps().isEmpty()) { + dataSource = new PhoenixDataSource(config.getHost(), config.getPort(), config.getProps()); + } else { + dataSource = new PhoenixDataSource(config.getHost(), config.getPort()); + } + } Review comment: Suggestion: ```java Map<String, Object> = config.getProps(); if (props == null) { props = new HashMap<>(); } ``` Then, you can eliminate the various props/no props variations, just always pass the (possibly empty) props. Even better, check if the props version accepts a null props value. ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixReader.java ########## @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.math.BigDecimal; +import java.sql.Array; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.Arrays; +import java.util.Map; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.vector.accessor.ColumnWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; + +public class PhoenixReader { + + private final RowSetLoader writer; + private final ColumnDefn[] columns; + private final ResultSet results; + private long count; + + public PhoenixReader(ResultSetLoader loader, ColumnDefn[] columns, ResultSet results) { + this.writer = loader.writer(); + this.columns = columns; + this.results = results; + } + + public RowSetLoader getStorage() { + return writer; + } + + public long getCount() { + return count; + } + + /** + * Fetch and process one row. + * @return return true if one row is processed, return false if there is no next row. + * @throws SQLException + */ + public boolean processRow() throws SQLException { + if (results.next()) { + writer.start(); + for (int index = 0; index < columns.length; index++) { + if (columns[index].getSqlType() == Types.ARRAY) { + Array result = results.getArray(index + 1); + if (result != null) { + columns[index].load(result.getArray()); + } + } else { + Object result = results.getObject(index + 1); + if (result != null) { + columns[index].load(result); + } + } + } + count++; + writer.save(); + return true; + } + return false; + } + + protected static final Map<Integer, MinorType> COLUMN_TYPE_MAP = Maps.newHashMap(); + + static { + // text + COLUMN_TYPE_MAP.put(Types.VARCHAR, MinorType.VARCHAR); + COLUMN_TYPE_MAP.put(Types.CHAR, MinorType.VARCHAR); + // numbers + COLUMN_TYPE_MAP.put(Types.BIGINT, MinorType.BIGINT); + COLUMN_TYPE_MAP.put(Types.INTEGER, MinorType.INT); + COLUMN_TYPE_MAP.put(Types.SMALLINT, MinorType.INT); + COLUMN_TYPE_MAP.put(Types.TINYINT, MinorType.INT); + COLUMN_TYPE_MAP.put(Types.DOUBLE, MinorType.FLOAT8); + COLUMN_TYPE_MAP.put(Types.FLOAT, MinorType.FLOAT4); + COLUMN_TYPE_MAP.put(Types.DECIMAL, MinorType.VARDECIMAL); + // time + COLUMN_TYPE_MAP.put(Types.DATE, MinorType.DATE); + COLUMN_TYPE_MAP.put(Types.TIME, MinorType.TIME); + COLUMN_TYPE_MAP.put(Types.TIMESTAMP, MinorType.TIMESTAMP); + // binary + COLUMN_TYPE_MAP.put(Types.BINARY, MinorType.VARBINARY); // Raw fixed length byte array. Mapped to byte[]. + COLUMN_TYPE_MAP.put(Types.VARBINARY, MinorType.VARBINARY); // Raw variable length byte array. + // boolean + COLUMN_TYPE_MAP.put(Types.BOOLEAN, MinorType.BIT); + } + + protected abstract static class ColumnDefn { + + final String name; + final int index; + final int sqlType; + ColumnWriter writer; + + public String getName() { + return name; + } + + public int getIndex() { + return index; + } + + public int getSqlType() { + return sqlType; + } + + public ColumnDefn(String name, int index, int sqlType) { + this.name = name; + this.index = index; + this.sqlType = sqlType; + } + + public void define(SchemaBuilder builder) { + builder.addNullable(getName(), COLUMN_TYPE_MAP.get(getSqlType())); + } + + public void bind(RowSetLoader loader) { + writer = loader.scalar(getName()); + } + + public abstract void load(Object value); + } + + protected static abstract class GenericDefn extends ColumnDefn { + + public GenericDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + } + + protected static class GenericVarcharDefn extends GenericDefn { + + public GenericVarcharDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setString((String) value); + } + } + + protected static class GenericBigintDefn extends GenericDefn { + + public GenericBigintDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setLong((Long) value); + } + } + + protected static class GenericIntegerDefn extends GenericDefn { + + public GenericIntegerDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setInt((Integer) value); + } + } + + protected static class GenericSmallintDefn extends GenericDefn { + + public GenericSmallintDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setInt((Short) value); + } + } + + protected static class GenericTinyintDefn extends GenericDefn { + + public GenericTinyintDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setInt((Byte) value); + } + } + + protected static class GenericDoubleDefn extends GenericDefn { + + public GenericDoubleDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setDouble((Double) value); + } + } + + protected static class GenericFloatDefn extends GenericDefn { + + public GenericFloatDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setFloat((Float) value); + } + } + + protected static class GenericDecimalDefn extends GenericDefn { + + public GenericDecimalDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setDecimal((BigDecimal) value); + } + } + + protected static class GenericDateDefn extends GenericDefn { + + public GenericDateDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setDate(((Date) value).toLocalDate()); + } + } + + protected static class GenericTimeDefn extends GenericDefn { + + public GenericTimeDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setTime(((Time) value).toLocalTime()); + } + } + + protected static class GenericTimestampDefn extends GenericDefn { + + public GenericTimestampDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setTimestamp(((Timestamp) value).toInstant()); + } + } + + protected static class GenericBinaryDefn extends GenericDefn { + + public GenericBinaryDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + byte[] byteValue = (byte[]) value; + ((ScalarWriter) writer).setBytes(byteValue, byteValue.length); + } + } + + protected static class GenericBooleanDefn extends GenericDefn { + + public GenericBooleanDefn(String name, int index, int sqlType) { + super(name, index, sqlType); + } + + @Override + public void load(Object value) { + ((ScalarWriter) writer).setBoolean((Boolean) value); + } + } + + protected static abstract class ArrayDefn extends ColumnDefn { + + static final String VARCHAR = "VARCHAR ARRAY"; + static final String CHAR = "CHAR ARRAY"; + static final String BIGINT = "BIGINT ARRAY"; + static final String INTEGER = "INTEGER ARRAY"; + static final String DOUBLE = "DOUBLE ARRAY"; + static final String FLOAT = "FLOAT ARRAY"; + static final String SMALLINT = "SMALLINT ARRAY"; + static final String TINYINT = "TINYINT ARRAY"; + static final String BOOLEAN = "BOOLEAN ARRAY"; + + final String baseType; + + public ArrayDefn(String name, int index, int sqlType, String baseType) { + super(name, index, sqlType); + this.baseType = baseType; + } + + @Override + public void bind(RowSetLoader loader) { + writer = loader.array(getName()); + } + } + + protected static class ArrayVarcharDefn extends ArrayDefn { + + public ArrayVarcharDefn(String name, int index, int sqlType, String baseType) { + super(name, index, sqlType, baseType); + } + + @Override + public void define(SchemaBuilder builder) { + builder.addArray(getName(), MinorType.VARCHAR); + } + + @Override + public void load(Object value) { + Object[] values = (Object[]) value; + writer.setObject(Arrays.copyOf(values, values.length, String[].class)); + } + } + + protected static class ArrayBigintDefn extends ArrayDefn { + + public ArrayBigintDefn(String name, int index, int sqlType, String baseType) { + super(name, index, sqlType, baseType); + } + + @Override + public void define(SchemaBuilder builder) { + builder.addArray(getName(), MinorType.BIGINT); + } + + @Override + public void load(Object value) { + Object[] values = (Object[]) value; + writer.setObject(Arrays.copyOf(values, values.length, Long[].class)); + } + } + + protected static class ArrayIntegerDefn extends ArrayDefn { + + public ArrayIntegerDefn(String name, int index, int sqlType, String baseType) { + super(name, index, sqlType, baseType); + } + + @Override + public void define(SchemaBuilder builder) { + builder.addArray(getName(), MinorType.INT); + } + + @Override + public void load(Object value) { + Object[] values = (Object[]) value; + writer.setObject(Arrays.copyOf(values, values.length, Integer[].class)); + } + } + + protected static class ArraySmallintDefn extends ArrayDefn { + + public ArraySmallintDefn(String name, int index, int sqlType, String baseType) { + super(name, index, sqlType, baseType); + } + + @Override + public void define(SchemaBuilder builder) { + builder.addArray(getName(), MinorType.SMALLINT); + } + + @Override + public void load(Object value) { + Object[] values = (Object[]) value; + writer.setObject(Arrays.copyOf(values, values.length, Short[].class)); + } + } + + protected static class ArrayTinyintDefn extends ArrayDefn { + + public ArrayTinyintDefn(String name, int index, int sqlType, String baseType) { + super(name, index, sqlType, baseType); + } + + @Override + public void define(SchemaBuilder builder) { + builder.addArray(getName(), MinorType.TINYINT); + } + + @Override + public void load(Object value) { + Object[] values = (Object[]) value; + writer.setObject(Arrays.copyOf(values, values.length, Byte[].class)); + } + } + + protected static class ArrayDoubleDefn extends ArrayDefn { + + public ArrayDoubleDefn(String name, int index, int sqlType, String baseType) { + super(name, index, sqlType, baseType); + } + + @Override + public void define(SchemaBuilder builder) { + builder.addArray(getName(), MinorType.FLOAT8); + } + + @Override + public void load(Object value) { + Object[] values = (Object[]) value; + writer.setObject(Arrays.copyOf(values, values.length, Double[].class)); + } + } + + protected static class ArrayBooleanDefn extends ArrayDefn { + + public ArrayBooleanDefn(String name, int index, int sqlType, String baseType) { + super(name, index, sqlType, baseType); + } + + @Override + public void define(SchemaBuilder builder) { + builder.addArray(getName(), MinorType.BIT); + } + + @Override + public void load(Object value) { + Object[] values = (Object[]) value; + writer.setObject(Arrays.copyOf(values, values.length, Boolean[].class)); Review comment: This is good. We can do better. `setObject()` for an array parses the array type, which is more-or-less the switch statement we wanted to avoid. In order for the parser to work, you had to copy the array, which is a nuisance. So, instead ensure that `writer` is of type `ScalarArrayWriter` and call `setBooleanObjectArray((Boolean[]) value)` instead. (But, it seems that we don't have this method, so it would need to be added...) Now, it might be that the cast won't work. So, the next change would be to add a `setObjectArray()` method to `ScalarArrayWriter` something like this: ```java public void setObjectArray(Object[] value) { for (int i = 0; i < value.length; i++) { final Object element = value[i]; if (element == null) { elementWriter.setNull(); } else { elementWriter.setObject(element); } } } ``` In the above, the `setObject()` call will be implemented by the underlying Boolean writer, which will expect the object to be a `Boolean`. If either of these works, apply the same change to the other array classes here. ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +import org.apache.commons.lang3.StringUtils; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig; +import org.apache.drill.common.logical.security.CredentialsProvider; +import org.apache.drill.exec.store.security.CredentialProviderUtils; +import org.apache.drill.exec.store.security.UsernamePasswordCredentials; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName(PhoenixStoragePluginConfig.NAME) +public class PhoenixStoragePluginConfig extends AbstractSecuredStoragePluginConfig { + + public static final String NAME = "phoenix"; + public static final String THIN_DRIVER_CLASS = "org.apache.phoenix.queryserver.client.Driver"; + public static final String FAT_DRIVER_CLASS = "org.apache.phoenix.jdbc.PhoenixDriver"; + + private final String host; + private final int port; + private final String jdbcURL; // (options) Equal to host + port + private final Map<String, Object> props; // (options) See also http://phoenix.apache.org/tuning.html + + @JsonCreator + public PhoenixStoragePluginConfig( + @JsonProperty("host") String host, + @JsonProperty("port") int port, + @JsonProperty("username") String username, + @JsonProperty("password") String password, + @JsonProperty("jdbcURL") String jdbcURL, + @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider, + @JsonProperty("props") Map<String, Object> props) { + super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider), credentialsProvider == null); + this.host = host; + this.port = port == 0 ? 8765 : port; + this.jdbcURL = jdbcURL; + this.props = props == null ? Collections.emptyMap() : props; + } + + @JsonIgnore + public UsernamePasswordCredentials getUsernamePasswordCredentials() { + return new UsernamePasswordCredentials(credentialsProvider); + } + + @JsonProperty("host") + public String getHost() { + return host; + } + + @JsonProperty("port") + public int getPort() { + return port; + } + + @JsonProperty("username") + public String getUsername() { + if (directCredentials) { + return getUsernamePasswordCredentials().getUsername(); + } + return null; + } + + @JsonIgnore + @JsonProperty("password") + public String getPassword() { + if (directCredentials) { + return getUsernamePasswordCredentials().getPassword(); + } + return null; + } + + @JsonProperty("jdbcURL") + public String getJdbcURL() { + return jdbcURL; + } + + @JsonProperty("props") + public Map<String, Object> getProps() { + return props; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o == null || !(o instanceof PhoenixStoragePluginConfig)) { + return false; + } + PhoenixStoragePluginConfig config = (PhoenixStoragePluginConfig) o; + // URL first + if (StringUtils.isNotBlank(config.getJdbcURL())) { + return Objects.equals(this.jdbcURL, config.getJdbcURL()); + } + // Then the host and port + return Objects.equals(this.host, config.getHost()) && Objects.equals(this.port, config.getPort()); + } + + @Override + public int hashCode() { + if (StringUtils.isNotBlank(jdbcURL)) { + return Objects.hash(jdbcURL); + } + return Objects.hash(host, port); + } + + @Override + public String toString() { + return new PlanStringBuilder(PhoenixStoragePluginConfig.NAME) + .field("driverName", THIN_DRIVER_CLASS) + .field("host", host) + .field("port", port) + .field("jdbcURL", jdbcURL) + .field("props", props) Review comment: Should this include the user name? And an obfuscated password? That is, show, say, "*****" if the password is set. (Show the same number of asterisks regardless of password length.) ########## File path: contrib/storage-phoenix/src/main/resources/logback-test.xml.bak ########## @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> Review comment: Did you check if this is actually needed? AFAIK, Logback will see all these files on the class path, and will load only one. Also, `logback-test.xml` probably should not be in the `main/resources`, else Logback will load it even in production. If this was copy-pasted from another extension plugin, then that one is probably also wrong. ########## File path: contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePluginConfig.java ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.phoenix; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +import org.apache.commons.lang3.StringUtils; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.logical.AbstractSecuredStoragePluginConfig; +import org.apache.drill.common.logical.security.CredentialsProvider; +import org.apache.drill.exec.store.security.CredentialProviderUtils; +import org.apache.drill.exec.store.security.UsernamePasswordCredentials; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName(PhoenixStoragePluginConfig.NAME) +public class PhoenixStoragePluginConfig extends AbstractSecuredStoragePluginConfig { + + public static final String NAME = "phoenix"; + public static final String THIN_DRIVER_CLASS = "org.apache.phoenix.queryserver.client.Driver"; + public static final String FAT_DRIVER_CLASS = "org.apache.phoenix.jdbc.PhoenixDriver"; + + private final String host; + private final int port; + private final String jdbcURL; // (options) Equal to host + port + private final Map<String, Object> props; // (options) See also http://phoenix.apache.org/tuning.html + + @JsonCreator + public PhoenixStoragePluginConfig( + @JsonProperty("host") String host, + @JsonProperty("port") int port, + @JsonProperty("username") String username, Review comment: Nit: a quick search of the sources suggests the Drill convention here is `"userName"` (upper case "N"). ########## File path: contrib/storage-phoenix/src/main/resources/bootstrap-storage-plugins.json ########## @@ -0,0 +1,14 @@ +{ + "storage": { + "phoenix": { + "type": "phoenix", + "jdbcURL": "jdbc:phoenix:thin:url=http://the.queryserver.hostname:8765;serialization=PROTOBUF", Review comment: Just to clarify, can this be: ```json "jdbcURL": "jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF", ``` Actually, let's think about this a bit more. You're using JDBC with the thin client. You require the `PROTOBUF` serialization. Looks like the config allows a host and port, and the reader code handles that case. So, can the JSON instead be: ```json "host": "localhost", "port": 8765, ``` It is much more likely people will get the above right: they only have to change the host field. No chance to accidentally change the wrong bits of the URL. Leave the URL only for an advanced case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Storage Plugin for Apache Phoenix > ------------------------------------- > > Key: DRILL-7863 > URL: https://issues.apache.org/jira/browse/DRILL-7863 > Project: Apache Drill > Issue Type: New Feature > Components: Storage - Other > Reporter: Cong Luo > Assignee: Cong Luo > Priority: Major > > There is a to-do list : > # MVP on EVF. > # Security Authentication. > # Support both the thin(PQS) and fat(ZK) driver. > # Compatibility with phoenix 4.x and 5.x. > # Shaded dependencies. -- This message was sent by Atlassian Jira (v8.20.1#820001)