This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e5ab11  PHOENIX-5232: PhoenixDataWriter in Phoenix-Spark connector 
does not commit when mutation batch size is reached
0e5ab11 is described below

commit 0e5ab11c119fbe8be0b5a82c0dfa38308851279e
Author: Chinmay Kulkarni <[email protected]>
AuthorDate: Wed Apr 10 15:50:42 2019 -0700

    PHOENIX-5232: PhoenixDataWriter in Phoenix-Spark connector does not commit 
when mutation batch size is reached
---
 phoenix-spark/pom.xml                              |  4 +-
 .../org/apache/phoenix/spark/PhoenixSparkIT.scala  | 74 +++++++++++++++++++++-
 .../spark/datasource/v2/PhoenixDataSource.java     | 35 ++--------
 .../v2/reader/PhoenixDataSourceReadOptions.java    | 14 ++--
 .../v2/reader/PhoenixDataSourceReader.java         | 14 ++--
 .../v2/reader/PhoenixInputPartition.java           | 20 ++++--
 .../v2/reader/PhoenixInputPartitionReader.java     | 28 +++++---
 .../v2/writer/PhoenixDataSourceWriteOptions.java   | 37 ++++++-----
 .../datasource/v2/writer/PhoenixDataWriter.java    | 28 ++++++--
 .../v2/writer/PhoenixDataWriterFactory.java        |  6 +-
 .../v2/writer/PhoenixDatasourceWriter.java         | 45 ++++++++++++-
 .../phoenix/spark/FilterExpressionCompiler.scala   |  3 +-
 .../org/apache/phoenix/spark/PhoenixRDD.scala      |  5 +-
 .../phoenix/spark/PhoenixRecordWritable.scala      |  2 +-
 .../org/apache/phoenix/spark/PhoenixRelation.scala |  2 +-
 .../org/apache/phoenix/spark/SparkSchemaUtil.scala | 13 +++-
 .../datasources/jdbc/PhoenixJdbcDialect.scala      |  2 +-
 .../execution/datasources/jdbc/SparkJdbcUtil.scala |  5 +-
 .../datasource/v2/PhoenixTestingDataSource.java    | 53 ++++++++++++++++
 .../v2/reader/PhoenixTestingDataSourceReader.java} | 21 +++---
 .../v2/reader/PhoenixTestingInputPartition.java}   | 20 ++----
 .../PhoenixTestingInputPartitionReader.java}       | 25 ++++----
 .../v2/writer/PhoenixTestingDataSourceWriter.java} | 30 ++++-----
 .../v2/writer/PhoenixTestingDataWriter.java}       | 30 ++++-----
 .../writer/PhoenixTestingDataWriterFactory.java}   | 12 ++--
 .../writer/PhoenixTestingWriterCommitMessage.java} | 20 +++---
 26 files changed, 367 insertions(+), 181 deletions(-)

diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index 0bbd983..56d2111 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -478,8 +478,8 @@
   </dependencies>
 
   <build>
-    <testSourceDirectory>src/it/scala</testSourceDirectory>
-    
<testResources><testResource><directory>src/it/resources</directory></testResource></testResources>
+    <testSourceDirectory>src/test/java</testSourceDirectory>
+    
<testResources><testResource><directory>src/test/resources</directory></testResource></testResources>
     <plugins>
         <plugin>
             <groupId>org.apache.maven.plugins</groupId>
diff --git 
a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala 
b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index b40b638..58910ce 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -16,12 +16,17 @@ package org.apache.phoenix.spark
 import java.sql.DriverManager
 import java.util.Date
 
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
 import org.apache.phoenix.schema.types.PVarchar
-import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
+import org.apache.phoenix.spark.datasource.v2.{PhoenixDataSource, 
PhoenixTestingDataSource}
+import 
org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingInputPartitionReader
+import 
org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter
 import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
-import org.apache.spark.sql.types._
+import org.apache.spark.SparkException
+import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DateType, 
IntegerType, LongType, StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, SaveMode}
 
+import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
 /**
@@ -249,6 +254,30 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     count shouldEqual 1L
   }
 
+  test("Can use extraOptions to set configs for workers during reads") {
+    // Pass in true, so we will get null when fetching the current row, 
leading to an NPE
+    var extraOptions = PhoenixTestingInputPartitionReader.RETURN_NULL_CURR_ROW 
+ "=true"
+    var rdd = spark.sqlContext.read
+      .format(PhoenixTestingDataSource.TEST_SOURCE)
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> 
quorumAddress,
+        PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions)).load
+
+    // Expect to get a NullPointerException in the executors
+    var error = intercept[SparkException] {
+      rdd.take(2)(0)(1)
+    }
+    assert(error.getCause.isInstanceOf[NullPointerException])
+
+    // Pass in false, so we will get the expected rows
+    extraOptions = PhoenixTestingInputPartitionReader.RETURN_NULL_CURR_ROW + 
"=false"
+    rdd = spark.sqlContext.read
+      .format(PhoenixTestingDataSource.TEST_SOURCE)
+      .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> 
quorumAddress,
+        PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions)).load
+    val stringValue = rdd.take(2)(0)(1)
+    stringValue shouldEqual "test_row_1"
+  }
+
   test("Can save to phoenix table") {
     val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
 
@@ -282,6 +311,47 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
     }
   }
 
+  test("Can use extraOptions to set configs for workers during writes") {
+    val totalRecords = 100
+    val upsertBatchSize = 5
+
+    val records = new mutable.MutableList[Row]
+    for (x <- 1 to totalRecords) {
+      records += Row(x.toLong, x.toString, x)
+    }
+    val dataSet = records.toList
+
+    val schema = StructType(
+      Seq(StructField("ID", LongType, nullable = false),
+        StructField("COL1", StringType),
+        StructField("COL2", IntegerType)))
+
+    // Distribute the dataset into an RDD with just 1 partition so we use only 
1 executor.
+    // This makes it easy to deterministically count the batched commits from 
that executor
+    // since it corresponds to exactly 1 input partition. In case of multiple 
executors with
+    // an uneven distribution of input partitions, if
+    // (number of records in that partition) % batchSize != 0, some updates 
would also be committed
+    // via PhoenixDataWriter#commit rather than the batch commits via 
PhoenixDataWriter#write
+    // and those would thus, not be counted by PhoenixTestingDataWriter.
+    val rowRDD = spark.sparkContext.parallelize(dataSet, 1)
+
+    // Apply the schema to the RDD.
+    val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+    val extraOptions =  PhoenixConfigurationUtil.UPSERT_BATCH_SIZE + "=" + 
upsertBatchSize.toString
+
+    // Initially, this should be zero
+    PhoenixTestingDataSourceWriter.TOTAL_BATCHES_COMMITTED_COUNT shouldEqual 0
+    df.write
+      .format(PhoenixTestingDataSource.TEST_SOURCE)
+      .options(Map("table" -> "OUTPUT_TEST_TABLE", 
PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress,
+        PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions))
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    // Verify the number of times batched updates are committed via DataWriters
+    PhoenixTestingDataSourceWriter.TOTAL_BATCHES_COMMITTED_COUNT shouldEqual 
totalRecords/upsertBatchSize
+  }
+
   test("Can save dates to Phoenix using java.sql.Date") {
     val date = java.sql.Date.valueOf("2016-09-30")
 
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
index 02b5edf..21250af 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
+++ 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
@@ -21,11 +21,8 @@ import java.util.Optional;
 import java.util.Properties;
 
 import org.apache.log4j.Logger;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader;
-import 
org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriteOptions;
-import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDatasourceWriter;
-import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriter;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.sources.DataSourceRegister;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -52,20 +49,9 @@ public class PhoenixDataSource  implements DataSourceV2,  
ReadSupport, WriteSupp
     }
 
     @Override
-    public Optional<DataSourceWriter> createWriter(String writeUUID, 
StructType schema, SaveMode mode,
-            DataSourceOptions options) {
-        if (!mode.equals(SaveMode.Overwrite)) {
-            throw new RuntimeException("SaveMode other than SaveMode.OverWrite 
is not supported");
-        }
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + 
DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + 
PhoenixDataSource.ZOOKEEPER_URL + " defined");
-        }
-
-        PhoenixDataSourceWriteOptions writeOptions = 
createPhoenixDataSourceWriteOptions(options, schema);
-        return Optional.of(new PhoenixDatasourceWriter(writeOptions));
+    public Optional<DataSourceWriter> createWriter(String writeUUID, 
StructType schema,
+            SaveMode mode, DataSourceOptions options) {
+        return Optional.of(new PhoenixDataSourceWriter(mode, schema, options));
     }
 
     /**
@@ -102,19 +88,6 @@ public class PhoenixDataSource  implements DataSourceV2,  
ReadSupport, WriteSupp
         return confToSet;
     }
 
-    private PhoenixDataSourceWriteOptions 
createPhoenixDataSourceWriteOptions(DataSourceOptions options,
-            StructType schema) {
-        String scn = 
options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE).orElse(null);
-        String tenantId = 
options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null);
-        String zkUrl = options.get(ZOOKEEPER_URL).get();
-        boolean skipNormalizingIdentifier = 
options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
-        return new 
PhoenixDataSourceWriteOptions.Builder().setTableName(options.tableName().get())
-                
.setZkUrl(zkUrl).setScn(scn).setTenantId(tenantId).setSchema(schema)
-                .setSkipNormalizingIdentifier(skipNormalizingIdentifier)
-                
.setOverriddenProps(extractPhoenixHBaseConfFromOptions(options))
-                .build();
-    }
-
     @Override
     public String shortName() {
         return "phoenix";
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
index 70062c8..67343d4 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
+++ 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.Serializable;
 import java.util.Properties;
 
-public class PhoenixDataSourceReadOptions implements Serializable {
+class PhoenixDataSourceReadOptions implements Serializable {
 
     private final String tenantId;
     private final String zkUrl;
@@ -30,7 +30,7 @@ public class PhoenixDataSourceReadOptions implements 
Serializable {
     private final String selectStatement;
     private final Properties overriddenProps;
 
-    public PhoenixDataSourceReadOptions(String zkUrl, String scn, String 
tenantId,
+    PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
             String selectStatement, Properties overriddenProps) {
         Preconditions.checkNotNull(overriddenProps);
         this.zkUrl = zkUrl;
@@ -40,23 +40,23 @@ public class PhoenixDataSourceReadOptions implements 
Serializable {
         this.overriddenProps = overriddenProps;
     }
 
-    public String getSelectStatement() {
+    String getSelectStatement() {
         return selectStatement;
     }
 
-    public String getScn() {
+    String getScn() {
         return scn;
     }
 
-    public String getZkUrl() {
+    String getZkUrl() {
         return zkUrl;
     }
 
-    public String getTenantId() {
+    String getTenantId() {
         return tenantId;
     }
 
-    public Properties getOverriddenProps() {
+    Properties getOverriddenProps() {
         return overriddenProps;
     }
 }
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
index 8476509..18e304b 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
+++ 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -85,7 +85,7 @@ public class PhoenixDataSourceReader implements 
DataSourceReader, SupportsPushDo
         }
         this.options = options;
         this.tableName = options.tableName().get();
-        this.zkUrl = options.get("zkUrl").get();
+        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
         this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
         this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
         setSchema();
@@ -106,6 +106,11 @@ public class PhoenixDataSourceReader implements 
DataSourceReader, SupportsPushDo
         }
     }
 
+    PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions 
readOptions,
+            PhoenixInputSplit inputSplit) {
+        return new PhoenixInputPartition(readOptions, schema, inputSplit);
+    }
+
     @Override
     public StructType readSchema() {
         return schema;
@@ -175,19 +180,18 @@ public class PhoenixDataSourceReader implements 
DataSourceReader, SupportsPushDo
 
                 // Get the region size
                 long regionSize = sizeCalculator.getRegionSize(
-                        location.getRegionInfo().getRegionName()
-                );
+                        location.getRegionInfo().getRegionName());
 
                 PhoenixDataSourceReadOptions phoenixDataSourceOptions =
                         new PhoenixDataSourceReadOptions(zkUrl, 
currentScnValue.orElse(null),
                                 tenantId.orElse(null), selectStatement, 
overriddenProps);
                 if (splitByStats) {
                     for (Scan aScan : scans) {
-                        partitions.add(new 
PhoenixInputPartition(phoenixDataSourceOptions, schema,
+                        
partitions.add(getInputPartition(phoenixDataSourceOptions,
                                 new 
PhoenixInputSplit(Collections.singletonList(aScan), regionSize, 
regionLocation)));
                     }
                 } else {
-                    partitions.add(new 
PhoenixInputPartition(phoenixDataSourceOptions, schema,
+                    partitions.add(getInputPartition(phoenixDataSourceOptions,
                             new PhoenixInputSplit(scans, regionSize, 
regionLocation)));
                 }
             }
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
index 624ff0f..e4adc07 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
+++ 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
@@ -26,16 +26,28 @@ import org.apache.spark.sql.types.StructType;
 
 public class PhoenixInputPartition implements InputPartition<InternalRow> {
 
-    private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
-    private StructType schema;
-    private PhoenixDataSourceReadOptions options;
+    private final SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+    private final StructType schema;
+    private final PhoenixDataSourceReadOptions options;
 
-    public PhoenixInputPartition(PhoenixDataSourceReadOptions options, 
StructType schema, PhoenixInputSplit phoenixInputSplit) {
+    PhoenixInputPartition(PhoenixDataSourceReadOptions options, StructType 
schema, PhoenixInputSplit phoenixInputSplit) {
         this.phoenixInputSplit = new SerializableWritable<>(phoenixInputSplit);
         this.schema = schema;
         this.options = options;
     }
 
+    PhoenixDataSourceReadOptions getOptions() {
+        return options;
+    }
+
+    StructType getSchema() {
+        return schema;
+    }
+
+    SerializableWritable<PhoenixInputSplit> getPhoenixInputSplit() {
+        return phoenixInputSplit;
+    }
+
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
         return new PhoenixInputPartitionReader(options, schema, 
phoenixInputSplit);
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
index 6f6413b..49f6c13 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
+++ 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -62,25 +62,30 @@ import static 
org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
 public class PhoenixInputPartitionReader implements 
InputPartitionReader<InternalRow>  {
 
-    private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
-    private StructType schema;
+    private final SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+    private final StructType schema;
+    private final PhoenixDataSourceReadOptions options;
     private Iterator<InternalRow> iterator;
     private PhoenixResultSet resultSet;
     private InternalRow currentRow;
-    private PhoenixDataSourceReadOptions options;
 
-    public PhoenixInputPartitionReader(PhoenixDataSourceReadOptions options, 
StructType schema, SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
+    PhoenixInputPartitionReader(PhoenixDataSourceReadOptions options, 
StructType schema,
+            SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
         this.options = options;
         this.phoenixInputSplit = phoenixInputSplit;
         this.schema = schema;
         initialize();
     }
 
+    Properties getOverriddenPropsFromOptions() {
+        return options.getOverriddenProps();
+    }
+
     private QueryPlan getQueryPlan() throws SQLException {
         String scn = options.getScn();
         String tenantId = options.getTenantId();
         String zkUrl = options.getZkUrl();
-        Properties overridingProps = options.getOverriddenProps();
+        Properties overridingProps = getOverriddenPropsFromOptions();
         if (scn != null) {
             overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
         }
@@ -95,8 +100,7 @@ public class PhoenixInputPartitionReader implements 
InputPartitionReader<Interna
 
             final PhoenixStatement pstmt = 
statement.unwrap(PhoenixStatement.class);
             // Optimize the query plan so that we potentially use secondary 
indexes
-            final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
-            return queryPlan;
+            return pstmt.optimizeQuery(selectStatement);
         }
     }
 
@@ -114,7 +118,8 @@ public class PhoenixInputPartitionReader implements 
InputPartitionReader<Interna
             ConnectionQueryServices services = 
queryPlan.getContext().getConnection().getQueryServices();
             services.clearTableRegionCache(tableNameBytes);
 
-            long renewScannerLeaseThreshold = 
queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+            long renewScannerLeaseThreshold = 
queryPlan.getContext().getConnection()
+                    .getQueryServices().getRenewLeaseThresholdMilliSeconds();
             for (Scan scan : scans) {
                 // For MR, skip the region boundary check exception if we 
encounter a split. ref: PHOENIX-2599
                 
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, 
Bytes.toBytes(true));
@@ -131,13 +136,16 @@ public class PhoenixInputPartitionReader implements 
InputPartitionReader<Interna
                 peekingResultIterator = 
LookAheadResultIterator.wrap(tableResultIterator);
                 iterators.add(peekingResultIterator);
             }
-            ResultIterator iterator = queryPlan.useRoundRobinIterator() ? 
RoundRobinResultIterator.newIterator(iterators, queryPlan) : 
ConcatResultIterator.newIterator(iterators);
+            ResultIterator iterator = queryPlan.useRoundRobinIterator() ?
+                    RoundRobinResultIterator.newIterator(iterators, queryPlan) 
:
+                    ConcatResultIterator.newIterator(iterators);
             if (queryPlan.getContext().getSequenceManager().getSequenceCount() 
> 0) {
                 iterator = new SequenceResultIterator(iterator, 
queryPlan.getContext().getSequenceManager());
             }
             // Clone the row projector as it's not thread safe and would be 
used simultaneously by
             // multiple threads otherwise.
-            this.resultSet = new PhoenixResultSet(iterator, 
queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
+            this.resultSet = new PhoenixResultSet(iterator, 
queryPlan.getProjector().cloneIfNecessary(),
+                    queryPlan.getContext());
             this.iterator = 
SparkJdbcUtil.resultSetToSparkInternalRows(resultSet, schema, new 
InputMetrics());
         }
         catch (SQLException e) {
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
index 434f13c..c130db8 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
+++ 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
@@ -23,7 +23,7 @@ import org.apache.spark.sql.types.StructType;
 import java.io.Serializable;
 import java.util.Properties;
 
-public class PhoenixDataSourceWriteOptions implements Serializable {
+class PhoenixDataSourceWriteOptions implements Serializable {
 
     private final String tableName;
     private final String zkUrl;
@@ -36,6 +36,9 @@ public class PhoenixDataSourceWriteOptions implements 
Serializable {
     private PhoenixDataSourceWriteOptions(String tableName, String zkUrl, 
String scn,
             String tenantId, StructType schema, boolean 
skipNormalizingIdentifier,
             Properties overriddenProps) {
+        Preconditions.checkNotNull(tableName);
+        Preconditions.checkNotNull(zkUrl);
+        Preconditions.checkNotNull(schema);
         Preconditions.checkNotNull(overriddenProps);
         this.tableName = tableName;
         this.zkUrl = zkUrl;
@@ -46,35 +49,35 @@ public class PhoenixDataSourceWriteOptions implements 
Serializable {
         this.overriddenProps = overriddenProps;
     }
 
-    public String getScn() {
+    String getScn() {
         return scn;
     }
 
-    public String getZkUrl() {
+    String getZkUrl() {
         return zkUrl;
     }
 
-    public String getTenantId() {
+    String getTenantId() {
         return tenantId;
     }
 
-    public StructType getSchema() {
+    StructType getSchema() {
         return schema;
     }
 
-    public String getTableName() {
+    String getTableName() {
         return tableName;
     }
 
-    public boolean skipNormalizingIdentifier() {
+    boolean skipNormalizingIdentifier() {
         return skipNormalizingIdentifier;
     }
 
-    public Properties getOverriddenProps() {
+    Properties getOverriddenProps() {
         return overriddenProps;
     }
 
-    public static class Builder {
+    static class Builder {
         private String tableName;
         private String zkUrl;
         private String scn;
@@ -83,42 +86,42 @@ public class PhoenixDataSourceWriteOptions implements 
Serializable {
         private boolean skipNormalizingIdentifier;
         private Properties overriddenProps = new Properties();
 
-        public Builder setTableName(String tableName) {
+        Builder setTableName(String tableName) {
             this.tableName = tableName;
             return this;
         }
 
-        public Builder setZkUrl(String zkUrl) {
+        Builder setZkUrl(String zkUrl) {
             this.zkUrl = zkUrl;
             return this;
         }
 
-        public Builder setScn(String scn) {
+        Builder setScn(String scn) {
             this.scn = scn;
             return this;
         }
 
-        public Builder setTenantId(String tenantId) {
+        Builder setTenantId(String tenantId) {
             this.tenantId = tenantId;
             return this;
         }
 
-        public Builder setSchema(StructType schema) {
+        Builder setSchema(StructType schema) {
             this.schema = schema;
             return this;
         }
 
-        public Builder setSkipNormalizingIdentifier(boolean 
skipNormalizingIdentifier) {
+        Builder setSkipNormalizingIdentifier(boolean 
skipNormalizingIdentifier) {
             this.skipNormalizingIdentifier = skipNormalizingIdentifier;
             return this;
         }
 
-        public Builder setOverriddenProps(Properties overriddenProps) {
+        Builder setOverriddenProps(Properties overriddenProps) {
             this.overriddenProps = overriddenProps;
             return this;
         }
 
-        public PhoenixDataSourceWriteOptions build() {
+        PhoenixDataSourceWriteOptions build() {
             return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn, 
tenantId, schema,
                     skipNormalizingIdentifier, overriddenProps);
         }
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
index 6793673..04670d5 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
+++ 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import org.apache.log4j.Logger;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -40,16 +41,22 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
 import com.google.common.collect.Lists;
+
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_UPSERT_BATCH_SIZE;
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.UPSERT_BATCH_SIZE;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
 public class PhoenixDataWriter implements DataWriter<InternalRow> {
 
+    private static final Logger logger = 
Logger.getLogger(PhoenixDataWriter.class);
     private final StructType schema;
     private final Connection conn;
     private final PreparedStatement statement;
+    private final long batchSize;
+    private long numRecords = 0;
 
-    public PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
+    PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
         String scn = options.getScn();
         String tenantId = options.getTenantId();
         String zkUrl = options.getZkUrl();
@@ -66,15 +73,21 @@ public class PhoenixDataWriter implements 
DataWriter<InternalRow> {
                     overridingProps);
             List<String> colNames = 
Lists.newArrayList(options.getSchema().names());
             if (!options.skipNormalizingIdentifier()){
-                colNames = colNames.stream().map(colName -> 
SchemaUtil.normalizeIdentifier(colName)).collect(Collectors.toList());
+                colNames = 
colNames.stream().map(SchemaUtil::normalizeIdentifier).collect(Collectors.toList());
             }
             String upsertSql = 
QueryUtil.constructUpsertStatement(options.getTableName(), colNames, null);
             this.statement = this.conn.prepareStatement(upsertSql);
+            this.batchSize = 
Long.valueOf(overridingProps.getProperty(UPSERT_BATCH_SIZE,
+                    String.valueOf(DEFAULT_UPSERT_BATCH_SIZE)));
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
     }
 
+    void commitBatchUpdates() throws SQLException {
+        conn.commit();
+    }
+
     @Override
     public void write(InternalRow internalRow) throws IOException {
         try {
@@ -90,14 +103,21 @@ public class PhoenixDataWriter implements 
DataWriter<InternalRow> {
                 }
                 ++i;
             }
+            numRecords++;
             statement.execute();
+            if (numRecords % batchSize == 0) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("commit called on a batch of size : " + 
batchSize);
+                }
+                commitBatchUpdates();
+            }
         } catch (SQLException e) {
             throw new IOException("Exception while executing Phoenix prepared 
statement", e);
         }
     }
 
     @Override
-    public WriterCommitMessage commit() throws IOException {
+    public WriterCommitMessage commit() {
         try {
             conn.commit();
         } catch (SQLException e) {
@@ -115,6 +135,6 @@ public class PhoenixDataWriter implements 
DataWriter<InternalRow> {
     }
 
     @Override
-    public void abort() throws IOException {
+    public void abort() {
     }
 }
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
index f7654e3..6fce8fe 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
+++ 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
@@ -25,10 +25,14 @@ public class PhoenixDataWriterFactory implements 
DataWriterFactory<InternalRow>
 
     private final PhoenixDataSourceWriteOptions options;
 
-    public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
+    PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
         this.options = options;
     }
 
+    PhoenixDataSourceWriteOptions getOptions() {
+        return options;
+    }
+
     @Override
     public DataWriter<InternalRow> createDataWriter(int partitionId, long 
taskId, long epochId) {
         return new PhoenixDataWriter(options);
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
index 9d713b8..31a3065 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
+++ 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
@@ -17,17 +17,35 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
 
-public class PhoenixDatasourceWriter implements DataSourceWriter {
+import static 
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
+import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static 
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
+
+public class PhoenixDataSourceWriter implements DataSourceWriter {
 
     private final PhoenixDataSourceWriteOptions options;
 
-    public PhoenixDatasourceWriter(PhoenixDataSourceWriteOptions options) {
-        this.options = options;
+    public PhoenixDataSourceWriter(SaveMode mode, StructType schema, 
DataSourceOptions options) {
+        if (!mode.equals(SaveMode.Overwrite)) {
+            throw new RuntimeException("SaveMode other than SaveMode.OverWrite 
is not supported");
+        }
+        if (!options.tableName().isPresent()) {
+            throw new RuntimeException("No Phoenix option " + 
DataSourceOptions.TABLE_KEY + " defined");
+        }
+        if (!options.get(ZOOKEEPER_URL).isPresent()) {
+            throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL + 
" defined");
+        }
+        this.options = createPhoenixDataSourceWriteOptions(options, schema);
     }
 
     @Override
@@ -47,4 +65,25 @@ public class PhoenixDatasourceWriter implements 
DataSourceWriter {
     @Override
     public void abort(WriterCommitMessage[] messages) {
     }
+
+    PhoenixDataSourceWriteOptions getOptions() {
+        return options;
+    }
+
+    private PhoenixDataSourceWriteOptions 
createPhoenixDataSourceWriteOptions(DataSourceOptions options,
+                                                                              
StructType schema) {
+        String scn = options.get(CURRENT_SCN_VALUE).orElse(null);
+        String tenantId = 
options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null);
+        String zkUrl = options.get(ZOOKEEPER_URL).get();
+        boolean skipNormalizingIdentifier = 
options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
+        return new PhoenixDataSourceWriteOptions.Builder()
+                .setTableName(options.tableName().get())
+                .setZkUrl(zkUrl)
+                .setScn(scn)
+                .setTenantId(tenantId)
+                .setSchema(schema)
+                .setSkipNormalizingIdentifier(skipNormalizingIdentifier)
+                
.setOverriddenProps(extractPhoenixHBaseConfFromOptions(options))
+                .build();
+    }
 }
diff --git 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
index 1d6973c..a2ec2dc 100644
--- 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
+++ 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -23,7 +23,8 @@ import java.text.Format
 
 import org.apache.phoenix.util.{DateUtil, SchemaUtil}
 import org.apache.phoenix.util.StringUtil.escapeStringConstant
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.sources.{And, EqualTo, Filter, GreaterThan, 
GreaterThanOrEqual, In,
+IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, 
StringEndsWith, StringStartsWith}
 
 class FilterExpressionCompiler() {
 
diff --git 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index 34033b7..89d808d 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -16,16 +16,15 @@ package org.apache.phoenix.spark
 import java.sql.DriverManager
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
+import org.apache.hadoop.hbase.HConstants
 import org.apache.hadoop.io.NullWritable
 import org.apache.phoenix.jdbc.PhoenixDriver
 import org.apache.phoenix.mapreduce.PhoenixInputFormat
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
 import org.apache.phoenix.query.HBaseFactoryProvider
-import org.apache.spark._
+import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
 import scala.collection.JavaConverters._
diff --git 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
index 6d4c4cc..66c347e 100644
--- 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
+++ 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
@@ -15,7 +15,7 @@ package org.apache.phoenix.spark
 
 import java.sql.{PreparedStatement, ResultSet}
 import org.apache.hadoop.mapreduce.lib.db.DBWritable
-import org.apache.phoenix.schema.types._
+import org.apache.phoenix.schema.types.{PBinary, PDataType, PDate, PVarbinary, 
PhoenixArray}
 import org.apache.phoenix.util.ColumnInfo
 import org.joda.time.DateTime
 import scala.collection.{mutable, immutable}
diff --git 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index 2f6ea8c..aacd460 100644
--- 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -19,7 +19,7 @@ package org.apache.phoenix.spark
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.sources.{BaseRelation, PrunedFilteredScan, Filter}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Row, SQLContext}
 
diff --git 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
index f69e988..363acf8 100644
--- 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
+++ 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
@@ -18,9 +18,18 @@
 package org.apache.phoenix.spark
 
 import org.apache.phoenix.query.QueryConstants
-import org.apache.phoenix.schema.types._
+import org.apache.phoenix.schema.types.{PBinary, PBinaryArray, PBoolean, 
PBooleanArray, PChar,
+PCharArray, PDate, PDateArray, PDecimal, PDecimalArray, PDouble, PDoubleArray, 
PFloat, PFloatArray,
+PInteger, PIntegerArray, PLong, PLongArray, PSmallint, PSmallintArray, PTime, 
PTimeArray,
+PTimestamp, PTimestampArray, PTinyint, PTinyintArray, PUnsignedDate, 
PUnsignedDateArray,
+PUnsignedDouble, PUnsignedDoubleArray, PUnsignedFloat, PUnsignedFloatArray, 
PUnsignedInt,
+PUnsignedIntArray, PUnsignedLong, PUnsignedLongArray, PUnsignedSmallint, 
PUnsignedSmallintArray,
+PUnsignedTime, PUnsignedTimeArray, PUnsignedTimestamp, 
PUnsignedTimestampArray, PUnsignedTinyint,
+PUnsignedTinyintArray, PVarbinary, PVarbinaryArray, PVarchar, PVarcharArray}
 import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteType, DataType, DateType,
+DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, 
StringType, StructField,
+StructType, TimestampType}
 
 object SparkSchemaUtil {
 
diff --git 
a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
 
b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
index 01437f0..f89a451 100644
--- 
a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
+++ 
b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
@@ -19,7 +19,7 @@
 package org.apache.spark.sql.execution.datasources.jdbc
 
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{BinaryType, ByteType, DataType, StringType}
 
 private object PhoenixJdbcDialect  extends JdbcDialect {
 
diff --git 
a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
 
b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
index eac483a..50cdbf5 100644
--- 
a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
+++ 
b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
@@ -27,10 +27,11 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteType, DataType, DateType,
+Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, Metadata, 
ShortType, StringType,
+StructType, TimestampType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.NextIterator
 
diff --git 
a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixTestingDataSource.java
 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixTestingDataSource.java
new file mode 100644
index 0000000..6a2299e
--- /dev/null
+++ 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixTestingDataSource.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2;
+
+import 
org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingDataSourceReader;
+import 
org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Optional;
+
+public class PhoenixTestingDataSource extends PhoenixDataSource {
+
+    public static final String TEST_SOURCE =
+            "org.apache.phoenix.spark.datasource.v2.PhoenixTestingDataSource";
+
+    // Override to return a test DataSourceReader
+    @Override
+    public DataSourceReader createReader(DataSourceOptions options) {
+        return new PhoenixTestingDataSourceReader(options);
+    }
+
+    // Override to return a test DataSourceWriter
+    @Override
+    public Optional<DataSourceWriter> createWriter(String writeUUID, 
StructType schema,
+            SaveMode mode, DataSourceOptions options) {
+        return Optional.of(new PhoenixTestingDataSourceWriter(mode, schema, 
options));
+    }
+
+    @Override
+    public String shortName() {
+        return "phoenixTesting";
+    }
+
+}
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingDataSourceReader.java
similarity index 57%
copy from 
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
copy to 
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingDataSourceReader.java
index f7654e3..0116255 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
+++ 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingDataSourceReader.java
@@ -15,22 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.phoenix.spark.datasource.v2.writer;
+package org.apache.phoenix.spark.datasource.v2.reader;
 
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 
-public class PhoenixDataWriterFactory implements 
DataWriterFactory<InternalRow> {
+public class PhoenixTestingDataSourceReader extends PhoenixDataSourceReader {
 
-    private final PhoenixDataSourceWriteOptions options;
-
-    public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
-        this.options = options;
+    public PhoenixTestingDataSourceReader(DataSourceOptions options) {
+        super(options);
     }
 
+    // Override to return a test InputPartition
     @Override
-    public DataWriter<InternalRow> createDataWriter(int partitionId, long 
taskId, long epochId) {
-        return new PhoenixDataWriter(options);
+    PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions 
readOptions,
+            PhoenixInputSplit inputSplit) {
+        return new PhoenixTestingInputPartition(readOptions, readSchema(), 
inputSplit);
     }
 }
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartition.java
similarity index 62%
copy from 
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
copy to 
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartition.java
index 624ff0f..eca7fc7 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
+++ 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartition.java
@@ -18,27 +18,21 @@
 package org.apache.phoenix.spark.datasource.v2.reader;
 
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
-import org.apache.spark.SerializableWritable;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.types.StructType;
 
-public class PhoenixInputPartition implements InputPartition<InternalRow> {
+public class PhoenixTestingInputPartition extends PhoenixInputPartition {
 
-    private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
-    private StructType schema;
-    private PhoenixDataSourceReadOptions options;
-
-    public PhoenixInputPartition(PhoenixDataSourceReadOptions options, 
StructType schema, PhoenixInputSplit phoenixInputSplit) {
-        this.phoenixInputSplit = new SerializableWritable<>(phoenixInputSplit);
-        this.schema = schema;
-        this.options = options;
+    PhoenixTestingInputPartition(PhoenixDataSourceReadOptions options, 
StructType schema,
+            PhoenixInputSplit phoenixInputSplit) {
+        super(options, schema, phoenixInputSplit);
     }
 
+    // Override to return a test InputPartitionReader for testing on the 
executor-side
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
-        return new PhoenixInputPartitionReader(options, schema, 
phoenixInputSplit);
+        return new PhoenixTestingInputPartitionReader(getOptions(), 
getSchema(),
+                getPhoenixInputSplit());
     }
-
 }
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartitionReader.java
similarity index 55%
copy from 
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
copy to 
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartitionReader.java
index 624ff0f..c85dc44 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
+++ 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartitionReader.java
@@ -20,25 +20,26 @@ package org.apache.phoenix.spark.datasource.v2.reader;
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.spark.SerializableWritable;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.types.StructType;
 
-public class PhoenixInputPartition implements InputPartition<InternalRow> {
+import java.util.Properties;
 
-    private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
-    private StructType schema;
-    private PhoenixDataSourceReadOptions options;
+public class PhoenixTestingInputPartitionReader extends 
PhoenixInputPartitionReader {
 
-    public PhoenixInputPartition(PhoenixDataSourceReadOptions options, 
StructType schema, PhoenixInputSplit phoenixInputSplit) {
-        this.phoenixInputSplit = new SerializableWritable<>(phoenixInputSplit);
-        this.schema = schema;
-        this.options = options;
+    // A test property which is used to modify the current row returned by the 
test input
+    // partition reader in order to check properties passed from the driver to 
executors
+    public static final String RETURN_NULL_CURR_ROW = "return.null.curr.row";
+
+    PhoenixTestingInputPartitionReader(PhoenixDataSourceReadOptions options, 
StructType schema,
+            SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
+        super(options, schema, phoenixInputSplit);
     }
 
+    // Override to return null rather than the actual row based on a property 
passed to the executor
     @Override
-    public InputPartitionReader<InternalRow> createPartitionReader() {
-        return new PhoenixInputPartitionReader(options, schema, 
phoenixInputSplit);
+    public InternalRow get() {
+        Properties props = getOverriddenPropsFromOptions();
+        return Boolean.valueOf(props.getProperty(RETURN_NULL_CURR_ROW)) ? null 
: super.get();
     }
 
 }
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataSourceWriter.java
similarity index 58%
copy from 
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
copy to 
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataSourceWriter.java
index 9d713b8..cbba62b 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
+++ 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataSourceWriter.java
@@ -17,34 +17,34 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
+import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
 
-public class PhoenixDatasourceWriter implements DataSourceWriter {
+public class PhoenixTestingDataSourceWriter extends PhoenixDataSourceWriter {
 
-    private final PhoenixDataSourceWriteOptions options;
+    // Used to keep track of the total number of batches committed across all 
executors
+    public static int TOTAL_BATCHES_COMMITTED_COUNT = 0;
 
-    public PhoenixDatasourceWriter(PhoenixDataSourceWriteOptions options) {
-        this.options = options;
+    public PhoenixTestingDataSourceWriter(SaveMode mode, StructType schema,
+            DataSourceOptions options) {
+        super(mode, schema, options);
     }
 
+    // Override to return a test DataWriterFactory
     @Override
     public DataWriterFactory<InternalRow> createWriterFactory() {
-        return new PhoenixDataWriterFactory(options);
-    }
-
-    @Override
-    public boolean useCommitCoordinator() {
-        return false;
+        return new PhoenixTestingDataWriterFactory(getOptions());
     }
 
+    // Override to sum up the total number of batches committed across all 
executors
     @Override
     public void commit(WriterCommitMessage[] messages) {
-    }
-
-    @Override
-    public void abort(WriterCommitMessage[] messages) {
+        for (WriterCommitMessage message : messages) {
+            TOTAL_BATCHES_COMMITTED_COUNT += 
Integer.parseInt(message.toString());
+        }
     }
 }
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriter.java
similarity index 56%
copy from 
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
copy to 
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriter.java
index 9d713b8..75aa447 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
+++ 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriter.java
@@ -17,34 +17,30 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 
-public class PhoenixDatasourceWriter implements DataSourceWriter {
+import java.sql.SQLException;
 
-    private final PhoenixDataSourceWriteOptions options;
+public class PhoenixTestingDataWriter extends PhoenixDataWriter {
 
-    public PhoenixDatasourceWriter(PhoenixDataSourceWriteOptions options) {
-        this.options = options;
-    }
+    private long numBatchesCommitted = 0;
 
-    @Override
-    public DataWriterFactory<InternalRow> createWriterFactory() {
-        return new PhoenixDataWriterFactory(options);
+    PhoenixTestingDataWriter(PhoenixDataSourceWriteOptions options) {
+        super(options);
     }
 
+    // Override to also count the number of times we call this method to test 
upsert batch commits
     @Override
-    public boolean useCommitCoordinator() {
-        return false;
+    void commitBatchUpdates() throws SQLException {
+        super.commitBatchUpdates();
+        numBatchesCommitted++;
     }
 
+    // Override to return a test WriterCommitMessage
     @Override
-    public void commit(WriterCommitMessage[] messages) {
+    public WriterCommitMessage commit() {
+        super.commit();
+        return new PhoenixTestingWriterCommitMessage(numBatchesCommitted);
     }
 
-    @Override
-    public void abort(WriterCommitMessage[] messages) {
-    }
 }
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriterFactory.java
similarity index 76%
copy from 
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
copy to 
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriterFactory.java
index f7654e3..4288128 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
+++ 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriterFactory.java
@@ -19,18 +19,16 @@ package org.apache.phoenix.spark.datasource.v2.writer;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
 
-public class PhoenixDataWriterFactory implements 
DataWriterFactory<InternalRow> {
+public class PhoenixTestingDataWriterFactory extends PhoenixDataWriterFactory {
 
-    private final PhoenixDataSourceWriteOptions options;
-
-    public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
-        this.options = options;
+    PhoenixTestingDataWriterFactory(PhoenixDataSourceWriteOptions options) {
+        super(options);
     }
 
+    // Override to return a test DataWriter
     @Override
     public DataWriter<InternalRow> createDataWriter(int partitionId, long 
taskId, long epochId) {
-        return new PhoenixDataWriter(options);
+        return new PhoenixTestingDataWriter(getOptions());
     }
 }
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingWriterCommitMessage.java
similarity index 57%
copy from 
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
copy to 
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingWriterCommitMessage.java
index f7654e3..1e43e07 100644
--- 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
+++ 
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingWriterCommitMessage.java
@@ -17,20 +17,22 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 
-public class PhoenixDataWriterFactory implements 
DataWriterFactory<InternalRow> {
+class PhoenixTestingWriterCommitMessage implements WriterCommitMessage {
 
-    private final PhoenixDataSourceWriteOptions options;
+    private final long numBatchesCommitted;
 
-    public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
-        this.options = options;
+    PhoenixTestingWriterCommitMessage(long numBatchesCommitted) {
+        this.numBatchesCommitted = numBatchesCommitted;
     }
 
+    // Override to keep track of the number of batches committed by the 
corresponding DataWriter
+    // in the WriterCommitMessage, so we can observe this value in the driver 
when we call
+    // {@link PhoenixTestingDataSourceWriter#commit(WriterCommitMessage[])}
     @Override
-    public DataWriter<InternalRow> createDataWriter(int partitionId, long 
taskId, long epochId) {
-        return new PhoenixDataWriter(options);
+    public String toString() {
+        return String.valueOf(this.numBatchesCommitted);
     }
+
 }

Reply via email to