ashwinb1998 commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r797455447



##########
File path: 
phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixBatchWrite.java
##########
@@ -17,45 +17,35 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.Map;
+
 import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
-import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
 import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
 import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 
-public class PhoenixDataSourceWriter implements DataSourceWriter {
+public class PhoenixBatchWrite implements BatchWrite {
 
     private final PhoenixDataSourceWriteOptions options;
+    private final LogicalWriteInfo writeInfo;
 
-    public PhoenixDataSourceWriter(SaveMode mode, StructType schema, 
DataSourceOptions options) {
-        if (!mode.equals(SaveMode.Overwrite)) {

Review comment:
       SaveMode has been removed from the DataSourceV2 API and hence the there 
is no parameter SaveMode that can be used here. Reference: 
https://issues.apache.org/jira/browse/SPARK-25531
   It is a requirement though, that for the writes, SaveMode should be 
"Append".  

##########
File path: 
phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixScan.java
##########
@@ -29,134 +29,97 @@
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.spark.FilterExpressionCompiler;
-import org.apache.phoenix.spark.SparkSchemaUtil;
 import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import org.apache.spark.sql.types.StructType;
-import scala.Tuple3;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 
 import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
-public class PhoenixDataSourceReader implements DataSourceReader, 
SupportsPushDownFilters,
-        SupportsPushDownRequiredColumns {
+public class PhoenixScan implements Scan, Batch {
 
-    private final DataSourceOptions options;
-    private final String tableName;
+    private final StructType schema;
+    private final CaseInsensitiveStringMap options;
     private final String zkUrl;
-    private final boolean dateAsTimestamp;
     private final Properties overriddenProps;
+    private PhoenixDataSourceReadOptions phoenixDataSourceOptions;
+    private final String tableName;
+    private String currentScnValue;
+    private String tenantId;
+    private boolean splitByStats;
+    private final String whereClause;
 
-    private StructType schema;
-    private Filter[] pushedFilters = new Filter[]{};
-    // derived from pushedFilters
-    private String whereClause;
-
-    public PhoenixDataSourceReader(DataSourceOptions options) {
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + 
DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + 
PhoenixDataSource.ZOOKEEPER_URL + " defined");
-        }
+    PhoenixScan(StructType schema, CaseInsensitiveStringMap options, String 
whereClause) {
+        this.schema = schema;
         this.options = options;
-        this.tableName = options.tableName().get();
-        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
-        this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+        this.whereClause = whereClause;
         this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
-        setSchema();
+        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL);
+        tableName = options.get("table");
     }
 
-    /**
-     * Sets the schema using all the table columns before any column pruning 
has been done
-     */
-    private void setSchema() {
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, 
overriddenProps)) {
-            List<ColumnInfo> columnInfos = 
PhoenixRuntime.generateColumnInfo(conn, tableName, null);
-            Seq<ColumnInfo> columnInfoSeq = 
JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
-            schema = 
SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+    private void populateOverriddenProperties(){
+        currentScnValue = 
options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+        tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        // Generate splits based off statistics, or just region splits?
+        splitByStats = options.getBoolean(
+                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, 
PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
+        if(currentScnValue != null) {
+            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
currentScnValue);
         }
-        catch (SQLException e) {
-            throw new RuntimeException(e);
+        if (tenantId != null){
+            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
     }
 
-    PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions 
readOptions,
-            PhoenixInputSplit inputSplit) {
-        return new PhoenixInputPartition(readOptions, schema, inputSplit);
-    }
-
     @Override
     public StructType readSchema() {
         return schema;
     }
 
     @Override
-    public Filter[] pushFilters(Filter[] filters) {
-        Tuple3<String, Filter[], Filter[]> tuple3 = new 
FilterExpressionCompiler().pushFilters(filters);
-        whereClause = tuple3._1();
-        pushedFilters = tuple3._3();
-        return tuple3._2();
+    public String description() {
+        return this.getClass().toString();
     }
 
     @Override
-    public List<InputPartition<InternalRow>> planInputPartitions() {
-        Optional<String> currentScnValue = 
options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
-        Optional<String> tenantId = 
options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
-        // Generate splits based off statistics, or just region splits?
-        boolean splitByStats = options.getBoolean(
-                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, 
PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
-        if(currentScnValue.isPresent()) {
-            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
currentScnValue.get());
-        }
-        if (tenantId.isPresent()){
-            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, 
tenantId.get());
-        }
+    public Batch toBatch() {
+        return this;
+    }
+
+    @Override
+    public InputPartition[] planInputPartitions() {
+        populateOverriddenProperties();
         try (Connection conn = DriverManager.getConnection(
                 JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, 
overriddenProps)) {
             List<ColumnInfo> columnInfos = 
PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>(
-                Arrays.asList(schema.names())));
+                    Arrays.asList(schema.names())));
             final Statement statement = conn.createStatement();
             final String selectStatement = 
QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause);
             if (selectStatement == null){
                 throw new NullPointerException();
             }
-
             final PhoenixStatement pstmt = 
statement.unwrap(PhoenixStatement.class);
             // Optimize the query plan so that we potentially use secondary 
indexes
             final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
-            final Scan scan = queryPlan.getContext().getScan();
-
-            // setting the snapshot configuration
-            Optional<String> snapshotName = 
options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);

Review comment:
       Woops, somehow missed it, will add it now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to