ashwinb1998 commented on a change in pull request #69: URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r797455447
########## File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixBatchWrite.java ########## @@ -17,45 +17,35 @@ */ package org.apache.phoenix.spark.datasource.v2.writer; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; -import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; -import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import com.google.common.annotations.VisibleForTesting; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.WriterCommitMessage; import org.apache.spark.sql.types.StructType; +import java.util.Map; + import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE; -import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; +import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions; -public class PhoenixDataSourceWriter implements DataSourceWriter { +public class PhoenixBatchWrite implements BatchWrite { private final PhoenixDataSourceWriteOptions options; + private final LogicalWriteInfo writeInfo; - public PhoenixDataSourceWriter(SaveMode mode, StructType schema, DataSourceOptions options) { - if (!mode.equals(SaveMode.Overwrite)) { Review comment: SaveMode has been removed from the DataSourceV2 API and hence the there is no parameter SaveMode that can be used here. Reference: https://issues.apache.org/jira/browse/SPARK-25531 It is a requirement though, that for the writes, SaveMode should be "Append". ########## File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixScan.java ########## @@ -29,134 +29,97 @@ import org.apache.phoenix.mapreduce.PhoenixInputSplit; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.spark.FilterExpressionCompiler; -import org.apache.phoenix.spark.SparkSchemaUtil; import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.Filter; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters; -import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.types.StructType; -import scala.Tuple3; -import scala.collection.JavaConverters; -import scala.collection.Seq; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.Properties; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; -public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters, - SupportsPushDownRequiredColumns { +public class PhoenixScan implements Scan, Batch { - private final DataSourceOptions options; - private final String tableName; + private final StructType schema; + private final CaseInsensitiveStringMap options; private final String zkUrl; - private final boolean dateAsTimestamp; private final Properties overriddenProps; + private PhoenixDataSourceReadOptions phoenixDataSourceOptions; + private final String tableName; + private String currentScnValue; + private String tenantId; + private boolean splitByStats; + private final String whereClause; - private StructType schema; - private Filter[] pushedFilters = new Filter[]{}; - // derived from pushedFilters - private String whereClause; - - public PhoenixDataSourceReader(DataSourceOptions options) { - if (!options.tableName().isPresent()) { - throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined"); - } - if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) { - throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined"); - } + PhoenixScan(StructType schema, CaseInsensitiveStringMap options, String whereClause) { + this.schema = schema; this.options = options; - this.tableName = options.tableName().get(); - this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get(); - this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false); + this.whereClause = whereClause; this.overriddenProps = extractPhoenixHBaseConfFromOptions(options); - setSchema(); + this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL); + tableName = options.get("table"); } - /** - * Sets the schema using all the table columns before any column pruning has been done - */ - private void setSchema() { - try (Connection conn = DriverManager.getConnection( - JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) { - List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null); - Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq(); - schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp); + private void populateOverriddenProperties(){ + currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); + tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); + // Generate splits based off statistics, or just region splits? + splitByStats = options.getBoolean( + PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS); + if(currentScnValue != null) { + overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue); } - catch (SQLException e) { - throw new RuntimeException(e); + if (tenantId != null){ + overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } } - PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions readOptions, - PhoenixInputSplit inputSplit) { - return new PhoenixInputPartition(readOptions, schema, inputSplit); - } - @Override public StructType readSchema() { return schema; } @Override - public Filter[] pushFilters(Filter[] filters) { - Tuple3<String, Filter[], Filter[]> tuple3 = new FilterExpressionCompiler().pushFilters(filters); - whereClause = tuple3._1(); - pushedFilters = tuple3._3(); - return tuple3._2(); + public String description() { + return this.getClass().toString(); } @Override - public List<InputPartition<InternalRow>> planInputPartitions() { - Optional<String> currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); - Optional<String> tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); - // Generate splits based off statistics, or just region splits? - boolean splitByStats = options.getBoolean( - PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS); - if(currentScnValue.isPresent()) { - overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get()); - } - if (tenantId.isPresent()){ - overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get()); - } + public Batch toBatch() { + return this; + } + + @Override + public InputPartition[] planInputPartitions() { + populateOverriddenProperties(); try (Connection conn = DriverManager.getConnection( JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) { List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>( - Arrays.asList(schema.names()))); + Arrays.asList(schema.names()))); final Statement statement = conn.createStatement(); final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause); if (selectStatement == null){ throw new NullPointerException(); } - final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); // Optimize the query plan so that we potentially use secondary indexes final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement); - final Scan scan = queryPlan.getContext().getScan(); - - // setting the snapshot configuration - Optional<String> snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY); Review comment: Woops, somehow missed it, will add it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org