This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new 0e5ab11 PHOENIX-5232: PhoenixDataWriter in Phoenix-Spark connector
does not commit when mutation batch size is reached
0e5ab11 is described below
commit 0e5ab11c119fbe8be0b5a82c0dfa38308851279e
Author: Chinmay Kulkarni <[email protected]>
AuthorDate: Wed Apr 10 15:50:42 2019 -0700
PHOENIX-5232: PhoenixDataWriter in Phoenix-Spark connector does not commit
when mutation batch size is reached
---
phoenix-spark/pom.xml | 4 +-
.../org/apache/phoenix/spark/PhoenixSparkIT.scala | 74 +++++++++++++++++++++-
.../spark/datasource/v2/PhoenixDataSource.java | 35 ++--------
.../v2/reader/PhoenixDataSourceReadOptions.java | 14 ++--
.../v2/reader/PhoenixDataSourceReader.java | 14 ++--
.../v2/reader/PhoenixInputPartition.java | 20 ++++--
.../v2/reader/PhoenixInputPartitionReader.java | 28 +++++---
.../v2/writer/PhoenixDataSourceWriteOptions.java | 37 ++++++-----
.../datasource/v2/writer/PhoenixDataWriter.java | 28 ++++++--
.../v2/writer/PhoenixDataWriterFactory.java | 6 +-
.../v2/writer/PhoenixDatasourceWriter.java | 45 ++++++++++++-
.../phoenix/spark/FilterExpressionCompiler.scala | 3 +-
.../org/apache/phoenix/spark/PhoenixRDD.scala | 5 +-
.../phoenix/spark/PhoenixRecordWritable.scala | 2 +-
.../org/apache/phoenix/spark/PhoenixRelation.scala | 2 +-
.../org/apache/phoenix/spark/SparkSchemaUtil.scala | 13 +++-
.../datasources/jdbc/PhoenixJdbcDialect.scala | 2 +-
.../execution/datasources/jdbc/SparkJdbcUtil.scala | 5 +-
.../datasource/v2/PhoenixTestingDataSource.java | 53 ++++++++++++++++
.../v2/reader/PhoenixTestingDataSourceReader.java} | 21 +++---
.../v2/reader/PhoenixTestingInputPartition.java} | 20 ++----
.../PhoenixTestingInputPartitionReader.java} | 25 ++++----
.../v2/writer/PhoenixTestingDataSourceWriter.java} | 30 ++++-----
.../v2/writer/PhoenixTestingDataWriter.java} | 30 ++++-----
.../writer/PhoenixTestingDataWriterFactory.java} | 12 ++--
.../writer/PhoenixTestingWriterCommitMessage.java} | 20 +++---
26 files changed, 367 insertions(+), 181 deletions(-)
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index 0bbd983..56d2111 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -478,8 +478,8 @@
</dependencies>
<build>
- <testSourceDirectory>src/it/scala</testSourceDirectory>
-
<testResources><testResource><directory>src/it/resources</directory></testResource></testResources>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+
<testResources><testResource><directory>src/test/resources</directory></testResource></testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git
a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index b40b638..58910ce 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -16,12 +16,17 @@ package org.apache.phoenix.spark
import java.sql.DriverManager
import java.util.Date
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.phoenix.schema.types.PVarchar
-import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
+import org.apache.phoenix.spark.datasource.v2.{PhoenixDataSource,
PhoenixTestingDataSource}
+import
org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingInputPartitionReader
+import
org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter
import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
-import org.apache.spark.sql.types._
+import org.apache.spark.SparkException
+import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DateType,
IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode}
+import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
@@ -249,6 +254,30 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
count shouldEqual 1L
}
+ test("Can use extraOptions to set configs for workers during reads") {
+ // Pass in true, so we will get null when fetching the current row,
leading to an NPE
+ var extraOptions = PhoenixTestingInputPartitionReader.RETURN_NULL_CURR_ROW
+ "=true"
+ var rdd = spark.sqlContext.read
+ .format(PhoenixTestingDataSource.TEST_SOURCE)
+ .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL ->
quorumAddress,
+ PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions)).load
+
+ // Expect to get a NullPointerException in the executors
+ var error = intercept[SparkException] {
+ rdd.take(2)(0)(1)
+ }
+ assert(error.getCause.isInstanceOf[NullPointerException])
+
+ // Pass in false, so we will get the expected rows
+ extraOptions = PhoenixTestingInputPartitionReader.RETURN_NULL_CURR_ROW +
"=false"
+ rdd = spark.sqlContext.read
+ .format(PhoenixTestingDataSource.TEST_SOURCE)
+ .options( Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL ->
quorumAddress,
+ PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions)).load
+ val stringValue = rdd.take(2)(0)(1)
+ stringValue shouldEqual "test_row_1"
+ }
+
test("Can save to phoenix table") {
val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))
@@ -282,6 +311,47 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
}
}
+ test("Can use extraOptions to set configs for workers during writes") {
+ val totalRecords = 100
+ val upsertBatchSize = 5
+
+ val records = new mutable.MutableList[Row]
+ for (x <- 1 to totalRecords) {
+ records += Row(x.toLong, x.toString, x)
+ }
+ val dataSet = records.toList
+
+ val schema = StructType(
+ Seq(StructField("ID", LongType, nullable = false),
+ StructField("COL1", StringType),
+ StructField("COL2", IntegerType)))
+
+ // Distribute the dataset into an RDD with just 1 partition so we use only
1 executor.
+ // This makes it easy to deterministically count the batched commits from
that executor
+ // since it corresponds to exactly 1 input partition. In case of multiple
executors with
+ // an uneven distribution of input partitions, if
+ // (number of records in that partition) % batchSize != 0, some updates
would also be committed
+ // via PhoenixDataWriter#commit rather than the batch commits via
PhoenixDataWriter#write
+ // and those would thus, not be counted by PhoenixTestingDataWriter.
+ val rowRDD = spark.sparkContext.parallelize(dataSet, 1)
+
+ // Apply the schema to the RDD.
+ val df = spark.sqlContext.createDataFrame(rowRDD, schema)
+ val extraOptions = PhoenixConfigurationUtil.UPSERT_BATCH_SIZE + "=" +
upsertBatchSize.toString
+
+ // Initially, this should be zero
+ PhoenixTestingDataSourceWriter.TOTAL_BATCHES_COMMITTED_COUNT shouldEqual 0
+ df.write
+ .format(PhoenixTestingDataSource.TEST_SOURCE)
+ .options(Map("table" -> "OUTPUT_TEST_TABLE",
PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress,
+ PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions))
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ // Verify the number of times batched updates are committed via DataWriters
+ PhoenixTestingDataSourceWriter.TOTAL_BATCHES_COMMITTED_COUNT shouldEqual
totalRecords/upsertBatchSize
+ }
+
test("Can save dates to Phoenix using java.sql.Date") {
val date = java.sql.Date.valueOf("2016-09-30")
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
index 02b5edf..21250af 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
+++
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
@@ -21,11 +21,8 @@ import java.util.Optional;
import java.util.Properties;
import org.apache.log4j.Logger;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader;
-import
org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriteOptions;
-import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDatasourceWriter;
-import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriter;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -52,20 +49,9 @@ public class PhoenixDataSource implements DataSourceV2,
ReadSupport, WriteSupp
}
@Override
- public Optional<DataSourceWriter> createWriter(String writeUUID,
StructType schema, SaveMode mode,
- DataSourceOptions options) {
- if (!mode.equals(SaveMode.Overwrite)) {
- throw new RuntimeException("SaveMode other than SaveMode.OverWrite
is not supported");
- }
- if (!options.tableName().isPresent()) {
- throw new RuntimeException("No Phoenix option " +
DataSourceOptions.TABLE_KEY + " defined");
- }
- if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
- throw new RuntimeException("No Phoenix option " +
PhoenixDataSource.ZOOKEEPER_URL + " defined");
- }
-
- PhoenixDataSourceWriteOptions writeOptions =
createPhoenixDataSourceWriteOptions(options, schema);
- return Optional.of(new PhoenixDatasourceWriter(writeOptions));
+ public Optional<DataSourceWriter> createWriter(String writeUUID,
StructType schema,
+ SaveMode mode, DataSourceOptions options) {
+ return Optional.of(new PhoenixDataSourceWriter(mode, schema, options));
}
/**
@@ -102,19 +88,6 @@ public class PhoenixDataSource implements DataSourceV2,
ReadSupport, WriteSupp
return confToSet;
}
- private PhoenixDataSourceWriteOptions
createPhoenixDataSourceWriteOptions(DataSourceOptions options,
- StructType schema) {
- String scn =
options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE).orElse(null);
- String tenantId =
options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null);
- String zkUrl = options.get(ZOOKEEPER_URL).get();
- boolean skipNormalizingIdentifier =
options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
- return new
PhoenixDataSourceWriteOptions.Builder().setTableName(options.tableName().get())
-
.setZkUrl(zkUrl).setScn(scn).setTenantId(tenantId).setSchema(schema)
- .setSkipNormalizingIdentifier(skipNormalizingIdentifier)
-
.setOverriddenProps(extractPhoenixHBaseConfFromOptions(options))
- .build();
- }
-
@Override
public String shortName() {
return "phoenix";
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
index 70062c8..67343d4 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
+++
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.Properties;
-public class PhoenixDataSourceReadOptions implements Serializable {
+class PhoenixDataSourceReadOptions implements Serializable {
private final String tenantId;
private final String zkUrl;
@@ -30,7 +30,7 @@ public class PhoenixDataSourceReadOptions implements
Serializable {
private final String selectStatement;
private final Properties overriddenProps;
- public PhoenixDataSourceReadOptions(String zkUrl, String scn, String
tenantId,
+ PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
String selectStatement, Properties overriddenProps) {
Preconditions.checkNotNull(overriddenProps);
this.zkUrl = zkUrl;
@@ -40,23 +40,23 @@ public class PhoenixDataSourceReadOptions implements
Serializable {
this.overriddenProps = overriddenProps;
}
- public String getSelectStatement() {
+ String getSelectStatement() {
return selectStatement;
}
- public String getScn() {
+ String getScn() {
return scn;
}
- public String getZkUrl() {
+ String getZkUrl() {
return zkUrl;
}
- public String getTenantId() {
+ String getTenantId() {
return tenantId;
}
- public Properties getOverriddenProps() {
+ Properties getOverriddenProps() {
return overriddenProps;
}
}
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
index 8476509..18e304b 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
+++
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -85,7 +85,7 @@ public class PhoenixDataSourceReader implements
DataSourceReader, SupportsPushDo
}
this.options = options;
this.tableName = options.tableName().get();
- this.zkUrl = options.get("zkUrl").get();
+ this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
setSchema();
@@ -106,6 +106,11 @@ public class PhoenixDataSourceReader implements
DataSourceReader, SupportsPushDo
}
}
+ PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions
readOptions,
+ PhoenixInputSplit inputSplit) {
+ return new PhoenixInputPartition(readOptions, schema, inputSplit);
+ }
+
@Override
public StructType readSchema() {
return schema;
@@ -175,19 +180,18 @@ public class PhoenixDataSourceReader implements
DataSourceReader, SupportsPushDo
// Get the region size
long regionSize = sizeCalculator.getRegionSize(
- location.getRegionInfo().getRegionName()
- );
+ location.getRegionInfo().getRegionName());
PhoenixDataSourceReadOptions phoenixDataSourceOptions =
new PhoenixDataSourceReadOptions(zkUrl,
currentScnValue.orElse(null),
tenantId.orElse(null), selectStatement,
overriddenProps);
if (splitByStats) {
for (Scan aScan : scans) {
- partitions.add(new
PhoenixInputPartition(phoenixDataSourceOptions, schema,
+
partitions.add(getInputPartition(phoenixDataSourceOptions,
new
PhoenixInputSplit(Collections.singletonList(aScan), regionSize,
regionLocation)));
}
} else {
- partitions.add(new
PhoenixInputPartition(phoenixDataSourceOptions, schema,
+ partitions.add(getInputPartition(phoenixDataSourceOptions,
new PhoenixInputSplit(scans, regionSize,
regionLocation)));
}
}
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
index 624ff0f..e4adc07 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
+++
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
@@ -26,16 +26,28 @@ import org.apache.spark.sql.types.StructType;
public class PhoenixInputPartition implements InputPartition<InternalRow> {
- private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
- private StructType schema;
- private PhoenixDataSourceReadOptions options;
+ private final SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+ private final StructType schema;
+ private final PhoenixDataSourceReadOptions options;
- public PhoenixInputPartition(PhoenixDataSourceReadOptions options,
StructType schema, PhoenixInputSplit phoenixInputSplit) {
+ PhoenixInputPartition(PhoenixDataSourceReadOptions options, StructType
schema, PhoenixInputSplit phoenixInputSplit) {
this.phoenixInputSplit = new SerializableWritable<>(phoenixInputSplit);
this.schema = schema;
this.options = options;
}
+ PhoenixDataSourceReadOptions getOptions() {
+ return options;
+ }
+
+ StructType getSchema() {
+ return schema;
+ }
+
+ SerializableWritable<PhoenixInputSplit> getPhoenixInputSplit() {
+ return phoenixInputSplit;
+ }
+
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
return new PhoenixInputPartitionReader(options, schema,
phoenixInputSplit);
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
index 6f6413b..49f6c13 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
+++
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -62,25 +62,30 @@ import static
org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
public class PhoenixInputPartitionReader implements
InputPartitionReader<InternalRow> {
- private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
- private StructType schema;
+ private final SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
+ private final StructType schema;
+ private final PhoenixDataSourceReadOptions options;
private Iterator<InternalRow> iterator;
private PhoenixResultSet resultSet;
private InternalRow currentRow;
- private PhoenixDataSourceReadOptions options;
- public PhoenixInputPartitionReader(PhoenixDataSourceReadOptions options,
StructType schema, SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
+ PhoenixInputPartitionReader(PhoenixDataSourceReadOptions options,
StructType schema,
+ SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
this.options = options;
this.phoenixInputSplit = phoenixInputSplit;
this.schema = schema;
initialize();
}
+ Properties getOverriddenPropsFromOptions() {
+ return options.getOverriddenProps();
+ }
+
private QueryPlan getQueryPlan() throws SQLException {
String scn = options.getScn();
String tenantId = options.getTenantId();
String zkUrl = options.getZkUrl();
- Properties overridingProps = options.getOverriddenProps();
+ Properties overridingProps = getOverriddenPropsFromOptions();
if (scn != null) {
overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
}
@@ -95,8 +100,7 @@ public class PhoenixInputPartitionReader implements
InputPartitionReader<Interna
final PhoenixStatement pstmt =
statement.unwrap(PhoenixStatement.class);
// Optimize the query plan so that we potentially use secondary
indexes
- final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
- return queryPlan;
+ return pstmt.optimizeQuery(selectStatement);
}
}
@@ -114,7 +118,8 @@ public class PhoenixInputPartitionReader implements
InputPartitionReader<Interna
ConnectionQueryServices services =
queryPlan.getContext().getConnection().getQueryServices();
services.clearTableRegionCache(tableNameBytes);
- long renewScannerLeaseThreshold =
queryPlan.getContext().getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
+ long renewScannerLeaseThreshold =
queryPlan.getContext().getConnection()
+ .getQueryServices().getRenewLeaseThresholdMilliSeconds();
for (Scan scan : scans) {
// For MR, skip the region boundary check exception if we
encounter a split. ref: PHOENIX-2599
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK,
Bytes.toBytes(true));
@@ -131,13 +136,16 @@ public class PhoenixInputPartitionReader implements
InputPartitionReader<Interna
peekingResultIterator =
LookAheadResultIterator.wrap(tableResultIterator);
iterators.add(peekingResultIterator);
}
- ResultIterator iterator = queryPlan.useRoundRobinIterator() ?
RoundRobinResultIterator.newIterator(iterators, queryPlan) :
ConcatResultIterator.newIterator(iterators);
+ ResultIterator iterator = queryPlan.useRoundRobinIterator() ?
+ RoundRobinResultIterator.newIterator(iterators, queryPlan)
:
+ ConcatResultIterator.newIterator(iterators);
if (queryPlan.getContext().getSequenceManager().getSequenceCount()
> 0) {
iterator = new SequenceResultIterator(iterator,
queryPlan.getContext().getSequenceManager());
}
// Clone the row projector as it's not thread safe and would be
used simultaneously by
// multiple threads otherwise.
- this.resultSet = new PhoenixResultSet(iterator,
queryPlan.getProjector().cloneIfNecessary(), queryPlan.getContext());
+ this.resultSet = new PhoenixResultSet(iterator,
queryPlan.getProjector().cloneIfNecessary(),
+ queryPlan.getContext());
this.iterator =
SparkJdbcUtil.resultSetToSparkInternalRows(resultSet, schema, new
InputMetrics());
}
catch (SQLException e) {
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
index 434f13c..c130db8 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
+++
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
@@ -23,7 +23,7 @@ import org.apache.spark.sql.types.StructType;
import java.io.Serializable;
import java.util.Properties;
-public class PhoenixDataSourceWriteOptions implements Serializable {
+class PhoenixDataSourceWriteOptions implements Serializable {
private final String tableName;
private final String zkUrl;
@@ -36,6 +36,9 @@ public class PhoenixDataSourceWriteOptions implements
Serializable {
private PhoenixDataSourceWriteOptions(String tableName, String zkUrl,
String scn,
String tenantId, StructType schema, boolean
skipNormalizingIdentifier,
Properties overriddenProps) {
+ Preconditions.checkNotNull(tableName);
+ Preconditions.checkNotNull(zkUrl);
+ Preconditions.checkNotNull(schema);
Preconditions.checkNotNull(overriddenProps);
this.tableName = tableName;
this.zkUrl = zkUrl;
@@ -46,35 +49,35 @@ public class PhoenixDataSourceWriteOptions implements
Serializable {
this.overriddenProps = overriddenProps;
}
- public String getScn() {
+ String getScn() {
return scn;
}
- public String getZkUrl() {
+ String getZkUrl() {
return zkUrl;
}
- public String getTenantId() {
+ String getTenantId() {
return tenantId;
}
- public StructType getSchema() {
+ StructType getSchema() {
return schema;
}
- public String getTableName() {
+ String getTableName() {
return tableName;
}
- public boolean skipNormalizingIdentifier() {
+ boolean skipNormalizingIdentifier() {
return skipNormalizingIdentifier;
}
- public Properties getOverriddenProps() {
+ Properties getOverriddenProps() {
return overriddenProps;
}
- public static class Builder {
+ static class Builder {
private String tableName;
private String zkUrl;
private String scn;
@@ -83,42 +86,42 @@ public class PhoenixDataSourceWriteOptions implements
Serializable {
private boolean skipNormalizingIdentifier;
private Properties overriddenProps = new Properties();
- public Builder setTableName(String tableName) {
+ Builder setTableName(String tableName) {
this.tableName = tableName;
return this;
}
- public Builder setZkUrl(String zkUrl) {
+ Builder setZkUrl(String zkUrl) {
this.zkUrl = zkUrl;
return this;
}
- public Builder setScn(String scn) {
+ Builder setScn(String scn) {
this.scn = scn;
return this;
}
- public Builder setTenantId(String tenantId) {
+ Builder setTenantId(String tenantId) {
this.tenantId = tenantId;
return this;
}
- public Builder setSchema(StructType schema) {
+ Builder setSchema(StructType schema) {
this.schema = schema;
return this;
}
- public Builder setSkipNormalizingIdentifier(boolean
skipNormalizingIdentifier) {
+ Builder setSkipNormalizingIdentifier(boolean
skipNormalizingIdentifier) {
this.skipNormalizingIdentifier = skipNormalizingIdentifier;
return this;
}
- public Builder setOverriddenProps(Properties overriddenProps) {
+ Builder setOverriddenProps(Properties overriddenProps) {
this.overriddenProps = overriddenProps;
return this;
}
- public PhoenixDataSourceWriteOptions build() {
+ PhoenixDataSourceWriteOptions build() {
return new PhoenixDataSourceWriteOptions(tableName, zkUrl, scn,
tenantId, schema,
skipNormalizingIdentifier, overriddenProps);
}
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
index 6793673..04670d5 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
+++
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
+import org.apache.log4j.Logger;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -40,16 +41,22 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import com.google.common.collect.Lists;
+
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_UPSERT_BATCH_SIZE;
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.UPSERT_BATCH_SIZE;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
public class PhoenixDataWriter implements DataWriter<InternalRow> {
+ private static final Logger logger =
Logger.getLogger(PhoenixDataWriter.class);
private final StructType schema;
private final Connection conn;
private final PreparedStatement statement;
+ private final long batchSize;
+ private long numRecords = 0;
- public PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
+ PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
String scn = options.getScn();
String tenantId = options.getTenantId();
String zkUrl = options.getZkUrl();
@@ -66,15 +73,21 @@ public class PhoenixDataWriter implements
DataWriter<InternalRow> {
overridingProps);
List<String> colNames =
Lists.newArrayList(options.getSchema().names());
if (!options.skipNormalizingIdentifier()){
- colNames = colNames.stream().map(colName ->
SchemaUtil.normalizeIdentifier(colName)).collect(Collectors.toList());
+ colNames =
colNames.stream().map(SchemaUtil::normalizeIdentifier).collect(Collectors.toList());
}
String upsertSql =
QueryUtil.constructUpsertStatement(options.getTableName(), colNames, null);
this.statement = this.conn.prepareStatement(upsertSql);
+ this.batchSize =
Long.valueOf(overridingProps.getProperty(UPSERT_BATCH_SIZE,
+ String.valueOf(DEFAULT_UPSERT_BATCH_SIZE)));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
+ void commitBatchUpdates() throws SQLException {
+ conn.commit();
+ }
+
@Override
public void write(InternalRow internalRow) throws IOException {
try {
@@ -90,14 +103,21 @@ public class PhoenixDataWriter implements
DataWriter<InternalRow> {
}
++i;
}
+ numRecords++;
statement.execute();
+ if (numRecords % batchSize == 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("commit called on a batch of size : " +
batchSize);
+ }
+ commitBatchUpdates();
+ }
} catch (SQLException e) {
throw new IOException("Exception while executing Phoenix prepared
statement", e);
}
}
@Override
- public WriterCommitMessage commit() throws IOException {
+ public WriterCommitMessage commit() {
try {
conn.commit();
} catch (SQLException e) {
@@ -115,6 +135,6 @@ public class PhoenixDataWriter implements
DataWriter<InternalRow> {
}
@Override
- public void abort() throws IOException {
+ public void abort() {
}
}
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
index f7654e3..6fce8fe 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
+++
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
@@ -25,10 +25,14 @@ public class PhoenixDataWriterFactory implements
DataWriterFactory<InternalRow>
private final PhoenixDataSourceWriteOptions options;
- public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
+ PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
this.options = options;
}
+ PhoenixDataSourceWriteOptions getOptions() {
+ return options;
+ }
+
@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, long
taskId, long epochId) {
return new PhoenixDataWriter(options);
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
index 9d713b8..31a3065 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
+++
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
@@ -17,17 +17,35 @@
*/
package org.apache.phoenix.spark.datasource.v2.writer;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
-public class PhoenixDatasourceWriter implements DataSourceWriter {
+import static
org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
+import static
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static
org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
+
+public class PhoenixDataSourceWriter implements DataSourceWriter {
private final PhoenixDataSourceWriteOptions options;
- public PhoenixDatasourceWriter(PhoenixDataSourceWriteOptions options) {
- this.options = options;
+ public PhoenixDataSourceWriter(SaveMode mode, StructType schema,
DataSourceOptions options) {
+ if (!mode.equals(SaveMode.Overwrite)) {
+ throw new RuntimeException("SaveMode other than SaveMode.OverWrite
is not supported");
+ }
+ if (!options.tableName().isPresent()) {
+ throw new RuntimeException("No Phoenix option " +
DataSourceOptions.TABLE_KEY + " defined");
+ }
+ if (!options.get(ZOOKEEPER_URL).isPresent()) {
+ throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL +
" defined");
+ }
+ this.options = createPhoenixDataSourceWriteOptions(options, schema);
}
@Override
@@ -47,4 +65,25 @@ public class PhoenixDatasourceWriter implements
DataSourceWriter {
@Override
public void abort(WriterCommitMessage[] messages) {
}
+
+ PhoenixDataSourceWriteOptions getOptions() {
+ return options;
+ }
+
+ private PhoenixDataSourceWriteOptions
createPhoenixDataSourceWriteOptions(DataSourceOptions options,
+
StructType schema) {
+ String scn = options.get(CURRENT_SCN_VALUE).orElse(null);
+ String tenantId =
options.get(PhoenixRuntime.TENANT_ID_ATTRIB).orElse(null);
+ String zkUrl = options.get(ZOOKEEPER_URL).get();
+ boolean skipNormalizingIdentifier =
options.getBoolean(SKIP_NORMALIZING_IDENTIFIER, false);
+ return new PhoenixDataSourceWriteOptions.Builder()
+ .setTableName(options.tableName().get())
+ .setZkUrl(zkUrl)
+ .setScn(scn)
+ .setTenantId(tenantId)
+ .setSchema(schema)
+ .setSkipNormalizingIdentifier(skipNormalizingIdentifier)
+
.setOverriddenProps(extractPhoenixHBaseConfFromOptions(options))
+ .build();
+ }
}
diff --git
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
index 1d6973c..a2ec2dc 100644
---
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
+++
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/FilterExpressionCompiler.scala
@@ -23,7 +23,8 @@ import java.text.Format
import org.apache.phoenix.util.{DateUtil, SchemaUtil}
import org.apache.phoenix.util.StringUtil.escapeStringConstant
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.sources.{And, EqualTo, Filter, GreaterThan,
GreaterThanOrEqual, In,
+IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains,
StringEndsWith, StringStartsWith}
class FilterExpressionCompiler() {
diff --git
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index 34033b7..89d808d 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -16,16 +16,15 @@ package org.apache.phoenix.spark
import java.sql.DriverManager
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
+import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.io.NullWritable
import org.apache.phoenix.jdbc.PhoenixDriver
import org.apache.phoenix.mapreduce.PhoenixInputFormat
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.phoenix.query.HBaseFactoryProvider
-import org.apache.spark._
+import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import scala.collection.JavaConverters._
diff --git
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
index 6d4c4cc..66c347e 100644
---
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
+++
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
@@ -15,7 +15,7 @@ package org.apache.phoenix.spark
import java.sql.{PreparedStatement, ResultSet}
import org.apache.hadoop.mapreduce.lib.db.DBWritable
-import org.apache.phoenix.schema.types._
+import org.apache.phoenix.schema.types.{PBinary, PDataType, PDate, PVarbinary,
PhoenixArray}
import org.apache.phoenix.util.ColumnInfo
import org.joda.time.DateTime
import scala.collection.{mutable, immutable}
diff --git
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index 2f6ea8c..aacd460 100644
---
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -19,7 +19,7 @@ package org.apache.phoenix.spark
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.sources.{BaseRelation, PrunedFilteredScan, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
diff --git
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
index f69e988..363acf8 100644
---
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
+++
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/SparkSchemaUtil.scala
@@ -18,9 +18,18 @@
package org.apache.phoenix.spark
import org.apache.phoenix.query.QueryConstants
-import org.apache.phoenix.schema.types._
+import org.apache.phoenix.schema.types.{PBinary, PBinaryArray, PBoolean,
PBooleanArray, PChar,
+PCharArray, PDate, PDateArray, PDecimal, PDecimalArray, PDouble, PDoubleArray,
PFloat, PFloatArray,
+PInteger, PIntegerArray, PLong, PLongArray, PSmallint, PSmallintArray, PTime,
PTimeArray,
+PTimestamp, PTimestampArray, PTinyint, PTinyintArray, PUnsignedDate,
PUnsignedDateArray,
+PUnsignedDouble, PUnsignedDoubleArray, PUnsignedFloat, PUnsignedFloatArray,
PUnsignedInt,
+PUnsignedIntArray, PUnsignedLong, PUnsignedLongArray, PUnsignedSmallint,
PUnsignedSmallintArray,
+PUnsignedTime, PUnsignedTimeArray, PUnsignedTimestamp,
PUnsignedTimestampArray, PUnsignedTinyint,
+PUnsignedTinyintArray, PVarbinary, PVarbinaryArray, PVarchar, PVarcharArray}
import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
ByteType, DataType, DateType,
+DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType,
StringType, StructField,
+StructType, TimestampType}
object SparkSchemaUtil {
diff --git
a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
index 01437f0..f89a451 100644
---
a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
+++
b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/PhoenixJdbcDialect.scala
@@ -19,7 +19,7 @@
package org.apache.spark.sql.execution.datasources.jdbc
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{BinaryType, ByteType, DataType, StringType}
private object PhoenixJdbcDialect extends JdbcDialect {
diff --git
a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
index eac483a..50cdbf5 100644
---
a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
+++
b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
@@ -27,10 +27,11 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData}
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils._
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType,
ByteType, DataType, DateType,
+Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, Metadata,
ShortType, StringType,
+StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.NextIterator
diff --git
a/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixTestingDataSource.java
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixTestingDataSource.java
new file mode 100644
index 0000000..6a2299e
--- /dev/null
+++
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/PhoenixTestingDataSource.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark.datasource.v2;
+
+import
org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingDataSourceReader;
+import
org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Optional;
+
+public class PhoenixTestingDataSource extends PhoenixDataSource {
+
+ public static final String TEST_SOURCE =
+ "org.apache.phoenix.spark.datasource.v2.PhoenixTestingDataSource";
+
+ // Override to return a test DataSourceReader
+ @Override
+ public DataSourceReader createReader(DataSourceOptions options) {
+ return new PhoenixTestingDataSourceReader(options);
+ }
+
+ // Override to return a test DataSourceWriter
+ @Override
+ public Optional<DataSourceWriter> createWriter(String writeUUID,
StructType schema,
+ SaveMode mode, DataSourceOptions options) {
+ return Optional.of(new PhoenixTestingDataSourceWriter(mode, schema,
options));
+ }
+
+ @Override
+ public String shortName() {
+ return "phoenixTesting";
+ }
+
+}
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingDataSourceReader.java
similarity index 57%
copy from
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
copy to
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingDataSourceReader.java
index f7654e3..0116255 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
+++
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingDataSourceReader.java
@@ -15,22 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.phoenix.spark.datasource.v2.writer;
+package org.apache.phoenix.spark.datasource.v2.reader;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
-public class PhoenixDataWriterFactory implements
DataWriterFactory<InternalRow> {
+public class PhoenixTestingDataSourceReader extends PhoenixDataSourceReader {
- private final PhoenixDataSourceWriteOptions options;
-
- public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
- this.options = options;
+ public PhoenixTestingDataSourceReader(DataSourceOptions options) {
+ super(options);
}
+ // Override to return a test InputPartition
@Override
- public DataWriter<InternalRow> createDataWriter(int partitionId, long
taskId, long epochId) {
- return new PhoenixDataWriter(options);
+ PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions
readOptions,
+ PhoenixInputSplit inputSplit) {
+ return new PhoenixTestingInputPartition(readOptions, readSchema(),
inputSplit);
}
}
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartition.java
similarity index 62%
copy from
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
copy to
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartition.java
index 624ff0f..eca7fc7 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
+++
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartition.java
@@ -18,27 +18,21 @@
package org.apache.phoenix.spark.datasource.v2.reader;
import org.apache.phoenix.mapreduce.PhoenixInputSplit;
-import org.apache.spark.SerializableWritable;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.types.StructType;
-public class PhoenixInputPartition implements InputPartition<InternalRow> {
+public class PhoenixTestingInputPartition extends PhoenixInputPartition {
- private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
- private StructType schema;
- private PhoenixDataSourceReadOptions options;
-
- public PhoenixInputPartition(PhoenixDataSourceReadOptions options,
StructType schema, PhoenixInputSplit phoenixInputSplit) {
- this.phoenixInputSplit = new SerializableWritable<>(phoenixInputSplit);
- this.schema = schema;
- this.options = options;
+ PhoenixTestingInputPartition(PhoenixDataSourceReadOptions options,
StructType schema,
+ PhoenixInputSplit phoenixInputSplit) {
+ super(options, schema, phoenixInputSplit);
}
+ // Override to return a test InputPartitionReader for testing on the
executor-side
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
- return new PhoenixInputPartitionReader(options, schema,
phoenixInputSplit);
+ return new PhoenixTestingInputPartitionReader(getOptions(),
getSchema(),
+ getPhoenixInputSplit());
}
-
}
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartitionReader.java
similarity index 55%
copy from
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
copy to
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartitionReader.java
index 624ff0f..c85dc44 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartition.java
+++
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixTestingInputPartitionReader.java
@@ -20,25 +20,26 @@ package org.apache.phoenix.spark.datasource.v2.reader;
import org.apache.phoenix.mapreduce.PhoenixInputSplit;
import org.apache.spark.SerializableWritable;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.types.StructType;
-public class PhoenixInputPartition implements InputPartition<InternalRow> {
+import java.util.Properties;
- private SerializableWritable<PhoenixInputSplit> phoenixInputSplit;
- private StructType schema;
- private PhoenixDataSourceReadOptions options;
+public class PhoenixTestingInputPartitionReader extends
PhoenixInputPartitionReader {
- public PhoenixInputPartition(PhoenixDataSourceReadOptions options,
StructType schema, PhoenixInputSplit phoenixInputSplit) {
- this.phoenixInputSplit = new SerializableWritable<>(phoenixInputSplit);
- this.schema = schema;
- this.options = options;
+ // A test property which is used to modify the current row returned by the
test input
+ // partition reader in order to check properties passed from the driver to
executors
+ public static final String RETURN_NULL_CURR_ROW = "return.null.curr.row";
+
+ PhoenixTestingInputPartitionReader(PhoenixDataSourceReadOptions options,
StructType schema,
+ SerializableWritable<PhoenixInputSplit> phoenixInputSplit) {
+ super(options, schema, phoenixInputSplit);
}
+ // Override to return null rather than the actual row based on a property
passed to the executor
@Override
- public InputPartitionReader<InternalRow> createPartitionReader() {
- return new PhoenixInputPartitionReader(options, schema,
phoenixInputSplit);
+ public InternalRow get() {
+ Properties props = getOverriddenPropsFromOptions();
+ return Boolean.valueOf(props.getProperty(RETURN_NULL_CURR_ROW)) ? null
: super.get();
}
}
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataSourceWriter.java
similarity index 58%
copy from
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
copy to
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataSourceWriter.java
index 9d713b8..cbba62b 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
+++
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataSourceWriter.java
@@ -17,34 +17,34 @@
*/
package org.apache.phoenix.spark.datasource.v2.writer;
+import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
-public class PhoenixDatasourceWriter implements DataSourceWriter {
+public class PhoenixTestingDataSourceWriter extends PhoenixDataSourceWriter {
- private final PhoenixDataSourceWriteOptions options;
+ // Used to keep track of the total number of batches committed across all
executors
+ public static int TOTAL_BATCHES_COMMITTED_COUNT = 0;
- public PhoenixDatasourceWriter(PhoenixDataSourceWriteOptions options) {
- this.options = options;
+ public PhoenixTestingDataSourceWriter(SaveMode mode, StructType schema,
+ DataSourceOptions options) {
+ super(mode, schema, options);
}
+ // Override to return a test DataWriterFactory
@Override
public DataWriterFactory<InternalRow> createWriterFactory() {
- return new PhoenixDataWriterFactory(options);
- }
-
- @Override
- public boolean useCommitCoordinator() {
- return false;
+ return new PhoenixTestingDataWriterFactory(getOptions());
}
+ // Override to sum up the total number of batches committed across all
executors
@Override
public void commit(WriterCommitMessage[] messages) {
- }
-
- @Override
- public void abort(WriterCommitMessage[] messages) {
+ for (WriterCommitMessage message : messages) {
+ TOTAL_BATCHES_COMMITTED_COUNT +=
Integer.parseInt(message.toString());
+ }
}
}
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriter.java
similarity index 56%
copy from
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
copy to
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriter.java
index 9d713b8..75aa447 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDatasourceWriter.java
+++
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriter.java
@@ -17,34 +17,30 @@
*/
package org.apache.phoenix.spark.datasource.v2.writer;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-public class PhoenixDatasourceWriter implements DataSourceWriter {
+import java.sql.SQLException;
- private final PhoenixDataSourceWriteOptions options;
+public class PhoenixTestingDataWriter extends PhoenixDataWriter {
- public PhoenixDatasourceWriter(PhoenixDataSourceWriteOptions options) {
- this.options = options;
- }
+ private long numBatchesCommitted = 0;
- @Override
- public DataWriterFactory<InternalRow> createWriterFactory() {
- return new PhoenixDataWriterFactory(options);
+ PhoenixTestingDataWriter(PhoenixDataSourceWriteOptions options) {
+ super(options);
}
+ // Override to also count the number of times we call this method to test
upsert batch commits
@Override
- public boolean useCommitCoordinator() {
- return false;
+ void commitBatchUpdates() throws SQLException {
+ super.commitBatchUpdates();
+ numBatchesCommitted++;
}
+ // Override to return a test WriterCommitMessage
@Override
- public void commit(WriterCommitMessage[] messages) {
+ public WriterCommitMessage commit() {
+ super.commit();
+ return new PhoenixTestingWriterCommitMessage(numBatchesCommitted);
}
- @Override
- public void abort(WriterCommitMessage[] messages) {
- }
}
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriterFactory.java
similarity index 76%
copy from
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
copy to
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriterFactory.java
index f7654e3..4288128 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
+++
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingDataWriterFactory.java
@@ -19,18 +19,16 @@ package org.apache.phoenix.spark.datasource.v2.writer;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-public class PhoenixDataWriterFactory implements
DataWriterFactory<InternalRow> {
+public class PhoenixTestingDataWriterFactory extends PhoenixDataWriterFactory {
- private final PhoenixDataSourceWriteOptions options;
-
- public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
- this.options = options;
+ PhoenixTestingDataWriterFactory(PhoenixDataSourceWriteOptions options) {
+ super(options);
}
+ // Override to return a test DataWriter
@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, long
taskId, long epochId) {
- return new PhoenixDataWriter(options);
+ return new PhoenixTestingDataWriter(getOptions());
}
}
diff --git
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingWriterCommitMessage.java
similarity index 57%
copy from
phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
copy to
phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingWriterCommitMessage.java
index f7654e3..1e43e07 100644
---
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriterFactory.java
+++
b/phoenix-spark/src/test/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixTestingWriterCommitMessage.java
@@ -17,20 +17,22 @@
*/
package org.apache.phoenix.spark.datasource.v2.writer;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-public class PhoenixDataWriterFactory implements
DataWriterFactory<InternalRow> {
+class PhoenixTestingWriterCommitMessage implements WriterCommitMessage {
- private final PhoenixDataSourceWriteOptions options;
+ private final long numBatchesCommitted;
- public PhoenixDataWriterFactory(PhoenixDataSourceWriteOptions options) {
- this.options = options;
+ PhoenixTestingWriterCommitMessage(long numBatchesCommitted) {
+ this.numBatchesCommitted = numBatchesCommitted;
}
+ // Override to keep track of the number of batches committed by the
corresponding DataWriter
+ // in the WriterCommitMessage, so we can observe this value in the driver
when we call
+ // {@link PhoenixTestingDataSourceWriter#commit(WriterCommitMessage[])}
@Override
- public DataWriter<InternalRow> createDataWriter(int partitionId, long
taskId, long epochId) {
- return new PhoenixDataWriter(options);
+ public String toString() {
+ return String.valueOf(this.numBatchesCommitted);
}
+
}