stoty commented on a change in pull request #69: URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r797705703
########## File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixScan.java ########## @@ -29,134 +29,104 @@ import org.apache.phoenix.mapreduce.PhoenixInputSplit; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.spark.FilterExpressionCompiler; -import org.apache.phoenix.spark.SparkSchemaUtil; import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.Filter; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters; -import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.types.StructType; -import scala.Tuple3; -import scala.collection.JavaConverters; -import scala.collection.Seq; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.Properties; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; -public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters, - SupportsPushDownRequiredColumns { +public class PhoenixScan implements Scan, Batch { - private final DataSourceOptions options; - private final String tableName; + private final StructType schema; + private final CaseInsensitiveStringMap options; private final String zkUrl; - private final boolean dateAsTimestamp; private final Properties overriddenProps; + private PhoenixDataSourceReadOptions phoenixDataSourceOptions; + private final String tableName; + private String currentScnValue; + private String tenantId; + private boolean splitByStats; + private final String whereClause; - private StructType schema; - private Filter[] pushedFilters = new Filter[]{}; - // derived from pushedFilters - private String whereClause; - - public PhoenixDataSourceReader(DataSourceOptions options) { - if (!options.tableName().isPresent()) { - throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined"); - } - if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) { - throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined"); - } + PhoenixScan(StructType schema, CaseInsensitiveStringMap options, String whereClause) { + this.schema = schema; this.options = options; - this.tableName = options.tableName().get(); - this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get(); - this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false); + this.whereClause = whereClause; this.overriddenProps = extractPhoenixHBaseConfFromOptions(options); - setSchema(); + this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL); + tableName = options.get("table"); } - /** - * Sets the schema using all the table columns before any column pruning has been done - */ - private void setSchema() { - try (Connection conn = DriverManager.getConnection( - JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) { - List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null); - Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq(); - schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp); + private void populateOverriddenProperties(){ + currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); + tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); + // Generate splits based off statistics, or just region splits? + splitByStats = options.getBoolean( + PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS); + if(currentScnValue != null) { + overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue); } - catch (SQLException e) { - throw new RuntimeException(e); + if (tenantId != null){ + overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } } - PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions readOptions, - PhoenixInputSplit inputSplit) { - return new PhoenixInputPartition(readOptions, schema, inputSplit); - } - @Override public StructType readSchema() { return schema; } @Override - public Filter[] pushFilters(Filter[] filters) { - Tuple3<String, Filter[], Filter[]> tuple3 = new FilterExpressionCompiler().pushFilters(filters); - whereClause = tuple3._1(); - pushedFilters = tuple3._3(); - return tuple3._2(); + public String description() { + return this.getClass().toString(); } @Override - public List<InputPartition<InternalRow>> planInputPartitions() { - Optional<String> currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); - Optional<String> tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); - // Generate splits based off statistics, or just region splits? - boolean splitByStats = options.getBoolean( - PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS); - if(currentScnValue.isPresent()) { - overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get()); - } - if (tenantId.isPresent()){ - overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get()); - } + public Batch toBatch() { + return this; + } + + @Override + public InputPartition[] planInputPartitions() { + populateOverriddenProperties(); try (Connection conn = DriverManager.getConnection( JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) { List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>( - Arrays.asList(schema.names()))); + Arrays.asList(schema.names()))); final Statement statement = conn.createStatement(); final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause); if (selectStatement == null){ throw new NullPointerException(); } - final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); // Optimize the query plan so that we potentially use secondary indexes final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement); - final Scan scan = queryPlan.getContext().getScan(); + final org.apache.hadoop.hbase.client.Scan scan = queryPlan.getContext().getScan(); // setting the snapshot configuration - Optional<String> snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY); - if (snapshotName.isPresent()) - PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection(). - getQueryServices().getConfiguration(), snapshotName.get()); + String snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY); + + if(snapshotName != null){ + PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().getQueryServices().getConfiguration(), snapshotName); Review comment: It's not off, it's just the GitHUb formatting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org