[ https://issues.apache.org/jira/browse/PHOENIX-6632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17485874#comment-17485874 ]
ASF GitHub Bot commented on PHOENIX-6632: ----------------------------------------- stoty commented on a change in pull request #69: URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r797649170 ########## File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java ########## @@ -17,42 +17,76 @@ */ package org.apache.phoenix.spark.datasource.v2; -import java.util.Optional; -import java.util.Properties; - -import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader; -import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.spark.sql.SaveMode; +import org.apache.phoenix.spark.SparkSchemaUtil; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableProvider; +import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.sources.DataSourceRegister; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.ReadSupport; -import org.apache.spark.sql.sources.v2.WriteSupport; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; /** * Implements the DataSourceV2 api to read and write from Phoenix tables */ -public class PhoenixDataSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister { +public class PhoenixDataSource implements TableProvider, DataSourceRegister { private static final Logger logger = LoggerFactory.getLogger(PhoenixDataSource.class); + public static final String TABLE_KEY = "table"; public static final String SKIP_NORMALIZING_IDENTIFIER = "skipNormalizingIdentifier"; public static final String ZOOKEEPER_URL = "zkUrl"; public static final String PHOENIX_CONFIGS = "phoenixconfigs"; + protected StructType schema; + private CaseInsensitiveStringMap options; @Override - public DataSourceReader createReader(DataSourceOptions options) { - return new PhoenixDataSourceReader(options); + public StructType inferSchema(CaseInsensitiveStringMap options){ + if (options.get("table") == null) { Review comment: use the new TABLE_KEY constant everywhere ########## File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixScan.java ########## @@ -29,134 +29,104 @@ import org.apache.phoenix.mapreduce.PhoenixInputSplit; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.spark.FilterExpressionCompiler; -import org.apache.phoenix.spark.SparkSchemaUtil; import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.Filter; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.sources.v2.reader.InputPartition; -import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters; -import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.types.StructType; -import scala.Tuple3; -import scala.collection.JavaConverters; -import scala.collection.Seq; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.Properties; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; -public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters, - SupportsPushDownRequiredColumns { +public class PhoenixScan implements Scan, Batch { - private final DataSourceOptions options; - private final String tableName; + private final StructType schema; + private final CaseInsensitiveStringMap options; private final String zkUrl; - private final boolean dateAsTimestamp; private final Properties overriddenProps; + private PhoenixDataSourceReadOptions phoenixDataSourceOptions; + private final String tableName; + private String currentScnValue; + private String tenantId; + private boolean splitByStats; + private final String whereClause; - private StructType schema; - private Filter[] pushedFilters = new Filter[]{}; - // derived from pushedFilters - private String whereClause; - - public PhoenixDataSourceReader(DataSourceOptions options) { - if (!options.tableName().isPresent()) { - throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined"); - } - if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) { - throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined"); - } + PhoenixScan(StructType schema, CaseInsensitiveStringMap options, String whereClause) { + this.schema = schema; this.options = options; - this.tableName = options.tableName().get(); - this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get(); - this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false); + this.whereClause = whereClause; this.overriddenProps = extractPhoenixHBaseConfFromOptions(options); - setSchema(); + this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL); + tableName = options.get("table"); } - /** - * Sets the schema using all the table columns before any column pruning has been done - */ - private void setSchema() { - try (Connection conn = DriverManager.getConnection( - JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) { - List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null); - Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq(); - schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp); + private void populateOverriddenProperties(){ + currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); + tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); + // Generate splits based off statistics, or just region splits? + splitByStats = options.getBoolean( + PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS); + if(currentScnValue != null) { + overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue); } - catch (SQLException e) { - throw new RuntimeException(e); + if (tenantId != null){ + overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } } - PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions readOptions, - PhoenixInputSplit inputSplit) { - return new PhoenixInputPartition(readOptions, schema, inputSplit); - } - @Override public StructType readSchema() { return schema; } @Override - public Filter[] pushFilters(Filter[] filters) { - Tuple3<String, Filter[], Filter[]> tuple3 = new FilterExpressionCompiler().pushFilters(filters); - whereClause = tuple3._1(); - pushedFilters = tuple3._3(); - return tuple3._2(); + public String description() { + return this.getClass().toString(); } @Override - public List<InputPartition<InternalRow>> planInputPartitions() { - Optional<String> currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); - Optional<String> tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); - // Generate splits based off statistics, or just region splits? - boolean splitByStats = options.getBoolean( - PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS); - if(currentScnValue.isPresent()) { - overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get()); - } - if (tenantId.isPresent()){ - overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get()); - } + public Batch toBatch() { + return this; + } + + @Override + public InputPartition[] planInputPartitions() { + populateOverriddenProperties(); try (Connection conn = DriverManager.getConnection( JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) { List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>( - Arrays.asList(schema.names()))); + Arrays.asList(schema.names()))); final Statement statement = conn.createStatement(); final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause); if (selectStatement == null){ throw new NullPointerException(); } - final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); // Optimize the query plan so that we potentially use secondary indexes final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement); - final Scan scan = queryPlan.getContext().getScan(); + final org.apache.hadoop.hbase.client.Scan scan = queryPlan.getContext().getScan(); // setting the snapshot configuration - Optional<String> snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY); - if (snapshotName.isPresent()) - PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection(). - getQueryServices().getConfiguration(), snapshotName.get()); + String snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY); + + if(snapshotName != null){ + PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().getQueryServices().getConfiguration(), snapshotName); Review comment: Indentation seems to be off. ########## File path: phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala ########## @@ -48,7 +48,7 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { .format("phoenix") .options(Map("table" -> "TABLE3", PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true")) - .mode(SaveMode.Overwrite) + .mode(SaveMode.Append) Review comment: You mentioned that SaveMode is no longer used internally. Does the SaveMode we set here gets simply ignored by Spark ? ########## File path: phoenix-spark-base/pom.xml ########## @@ -121,7 +121,7 @@ <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> - <version>2.2.4</version> + <version>3.1.0</version> Review comment: I'm not sure how stable this Spark API is. If it is indeed stable for all 3.x, then we'd better use the latest version, hoping that it's faster and more stable for our tests. ########## File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixBatchWrite.java ########## @@ -17,45 +17,35 @@ */ package org.apache.phoenix.spark.datasource.v2.writer; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; -import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; -import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; +import com.google.common.annotations.VisibleForTesting; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.DataWriterFactory; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.PhysicalWriteInfo; +import org.apache.spark.sql.connector.write.WriterCommitMessage; import org.apache.spark.sql.types.StructType; +import java.util.Map; + import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE; -import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL; +import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER; import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions; -public class PhoenixDataSourceWriter implements DataSourceWriter { +public class PhoenixBatchWrite implements BatchWrite { private final PhoenixDataSourceWriteOptions options; + private final LogicalWriteInfo writeInfo; - public PhoenixDataSourceWriter(SaveMode mode, StructType schema, DataSourceOptions options) { - if (!mode.equals(SaveMode.Overwrite)) { Review comment: No SaveMode, no cry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Migrate connectors to Spark-3 > ----------------------------- > > Key: PHOENIX-6632 > URL: https://issues.apache.org/jira/browse/PHOENIX-6632 > Project: Phoenix > Issue Type: Improvement > Components: spark-connector > Affects Versions: connectors-6.0.0 > Reporter: Ashwin Balasubramani > Priority: Major > > With Spark-3, the DatasourceV2 API has had major changes, where a new > TableProvider Interface has been introduced. These new changes bring in more > control to the data source developer and better integration with > spark-optimizer. -- This message was sent by Atlassian Jira (v8.20.1#820001)