chrajeshbabu commented on code in PR #79:
URL: https://github.com/apache/phoenix-connectors/pull/79#discussion_r872323611
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
Review Comment:
Remove unnecessary parentheses.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
Review Comment:
Make it clear that Generate Input Splits in serial.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
+ jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths)
+ );
+ }
+ } else {
+ final int parallism = jobConf.getInt(
+ "hive.phoenix.split.parallel.level",
+ Runtime.getRuntime().availableProcessors() * 2);
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ parallism);
+ LOG.info("generate splits in parallel with {} threads", parallism);
- // Adding Localization
- try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
{
- RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(qplan
- .getTableRef().getTable().getPhysicalName().toString()));
+ List<Future<List<InputSplit>>> tasks = new ArrayList<>();
- for (List<Scan> scans : qplan.getScans()) {
+ try {
+ for (final List<Scan> scans : qplan.getScans()) {
+ Future<List<InputSplit>> task = executorService.submit(
+ new Callable<List<InputSplit>>() {
+ @Override
+ public List<InputSplit> call() throws Exception {
+ return generateSplitsInternal(jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths);
+ }
+ });
+ tasks.add(task);
+ }
+ for (Future<List<InputSplit>> task : tasks) {
+ psplits.addAll(task.get());
+ }
+ } catch (ExecutionException | InterruptedException exception) {
+ throw new IOException("failed to get splits,reason:",
+ exception);
+ } finally {
+ executorService.shutdown();
+ }
+ }
+ return psplits;
+ }
+ /**
+ * This method is used to generate splits for each scan list.
+ * @param jobConf MapReduce Job Configuration
+ * @param qplan phoenix query plan
+ * @param splits phoenix table splits
+ * @param query phoenix query statement
+ * @param scans scan list slice of query plan
+ * @param splitByStats split by stat enabled
+ * @param tablePaths table paths
+ * @return List of Input Splits
+ * @throws IOException if function fails
+ */
+ private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
+ final QueryPlan qplan,
+ final List<KeyRange> splits,
+ final String query,
+ final List<Scan> scans,
+ final boolean splitByStats,
+ final Path[] tablePaths) throws IOException {
+
+ final List<InputSplit> psplits = new ArrayList<>(scans.size());
+ try (org.apache.hadoop.hbase.client.Connection connection =
+ ConnectionFactory.createConnection(
+ PhoenixConnectionUtil.getConfiguration(jobConf))) {
+ RegionLocator regionLocator =
+ connection.getRegionLocator(TableName.valueOf(
+ qplan.getTableRef().getTable()
+ .getPhysicalName().toString()));
PhoenixInputSplit inputSplit;
- HRegionLocation location =
regionLocator.getRegionLocation(scans.get(0).getStartRow()
- , false);
- long regionSize = CompatUtil.getSize(regionLocator,
connection.getAdmin(), location);
- String regionLocation =
PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
+ HRegionLocation location = regionLocator.getRegionLocation(
+ scans.get(0).getStartRow(),
+ false);
+ long regionSize = CompatUtil.getSize(regionLocator,
+ connection.getAdmin(),
+ location);
+ String regionLocation =
+ PhoenixStorageHandlerUtil.getRegionLocation(location,
+ LOG);
if (splitByStats) {
for (Scan aScan : scans) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Split for scan : " + aScan + "with
scanAttribute : " + aScan
- .getAttributesMap() + " [scanCache,
cacheBlock, scanBatch] : [" +
- aScan.getCaching() + ", " +
aScan.getCacheBlocks() + ", " + aScan
- .getBatch() + "] and regionLocation : " +
regionLocation);
+ LOG.debug("Split for scan : "
Review Comment:
Format the code properly.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
+ jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths)
+ );
+ }
+ } else {
+ final int parallism = jobConf.getInt(
+ "hive.phoenix.split.parallel.level",
+ Runtime.getRuntime().availableProcessors() * 2);
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ parallism);
+ LOG.info("generate splits in parallel with {} threads", parallism);
- // Adding Localization
- try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
{
- RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(qplan
- .getTableRef().getTable().getPhysicalName().toString()));
+ List<Future<List<InputSplit>>> tasks = new ArrayList<>();
- for (List<Scan> scans : qplan.getScans()) {
+ try {
+ for (final List<Scan> scans : qplan.getScans()) {
+ Future<List<InputSplit>> task = executorService.submit(
+ new Callable<List<InputSplit>>() {
+ @Override
+ public List<InputSplit> call() throws Exception {
+ return generateSplitsInternal(jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths);
+ }
+ });
+ tasks.add(task);
+ }
+ for (Future<List<InputSplit>> task : tasks) {
+ psplits.addAll(task.get());
+ }
+ } catch (ExecutionException | InterruptedException exception) {
+ throw new IOException("failed to get splits,reason:",
Review Comment:
Log message can be improved.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
Review Comment:
Reduce the number of lines used to call the method.
##########
phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Properties;
+
+/**
+ * Test class for Hive PhoenixInputFormat
+ */
+@NotThreadSafe
+@Category(ParallelStatsDisabledTest.class)
+public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ HivePhoenixInputFormatTest.class);
+ private static final String TABLE_NAME = "HivePhoenixInputFormatTest"
+ .toUpperCase(Locale.ROOT);
+ private static final String DDL = "CREATE TABLE "
+ + TABLE_NAME
+ + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)";
+ private static final int SPLITS = 128;
+
+ /*
+ *
+ * This test will create phoenix table with 128 splits and compare
+ * performance of split generation in serial/parallel
+ *
+ * */
+ @Test
+ public void testGetSplitsSerialOrParallel() throws
IOException,SQLException {
+ PhoenixInputFormat<PhoenixRecordWritable> inputFormat =
+ new PhoenixInputFormat<PhoenixRecordWritable>();
+ long start,end;
+
+ // create table with N splits
+ System.out.println(
+ String.format("generate testing table with %s splits",
+ String.valueOf(SPLITS)));
+ setupTestTable();
+ // setup configuration required for PhoenixInputFormat
+ Configuration conf = getUtility().getConfiguration();
+ JobConf jobConf = new JobConf(conf);
+ configureTestInput(jobConf);
+
+
+ // test get splits in serial
+ start = System.currentTimeMillis();
+ jobConf.set("hive.phoenix.split.parallel.threshold","0");
+ InputSplit[] inputSplitsSerial = inputFormat.getSplits(jobConf,SPLITS);
+ end = System.currentTimeMillis();
+ long durationInSerial=end - start;
+ System.out.println(String.format(
Review Comment:
We need to use assertions and no use in printing those in the logs.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
+ jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths)
+ );
+ }
+ } else {
+ final int parallism = jobConf.getInt(
+ "hive.phoenix.split.parallel.level",
+ Runtime.getRuntime().availableProcessors() * 2);
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ parallism);
+ LOG.info("generate splits in parallel with {} threads", parallism);
- // Adding Localization
- try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
{
- RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(qplan
- .getTableRef().getTable().getPhysicalName().toString()));
+ List<Future<List<InputSplit>>> tasks = new ArrayList<>();
- for (List<Scan> scans : qplan.getScans()) {
+ try {
+ for (final List<Scan> scans : qplan.getScans()) {
+ Future<List<InputSplit>> task = executorService.submit(
+ new Callable<List<InputSplit>>() {
+ @Override
+ public List<InputSplit> call() throws Exception {
+ return generateSplitsInternal(jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths);
+ }
+ });
+ tasks.add(task);
+ }
+ for (Future<List<InputSplit>> task : tasks) {
+ psplits.addAll(task.get());
+ }
+ } catch (ExecutionException | InterruptedException exception) {
+ throw new IOException("failed to get splits,reason:",
+ exception);
+ } finally {
+ executorService.shutdown();
+ }
+ }
+ return psplits;
+ }
+ /**
+ * This method is used to generate splits for each scan list.
+ * @param jobConf MapReduce Job Configuration
+ * @param qplan phoenix query plan
+ * @param splits phoenix table splits
+ * @param query phoenix query statement
+ * @param scans scan list slice of query plan
+ * @param splitByStats split by stat enabled
+ * @param tablePaths table paths
+ * @return List of Input Splits
+ * @throws IOException if function fails
+ */
+ private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
+ final QueryPlan qplan,
+ final List<KeyRange> splits,
+ final String query,
+ final List<Scan> scans,
+ final boolean splitByStats,
+ final Path[] tablePaths) throws IOException {
+
+ final List<InputSplit> psplits = new ArrayList<>(scans.size());
+ try (org.apache.hadoop.hbase.client.Connection connection =
Review Comment:
The connection creation can be shared and reuse when generating the
inputsplit.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
+ jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths)
+ );
+ }
+ } else {
+ final int parallism = jobConf.getInt(
+ "hive.phoenix.split.parallel.level",
+ Runtime.getRuntime().availableProcessors() * 2);
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ parallism);
+ LOG.info("generate splits in parallel with {} threads", parallism);
- // Adding Localization
- try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
{
- RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(qplan
- .getTableRef().getTable().getPhysicalName().toString()));
+ List<Future<List<InputSplit>>> tasks = new ArrayList<>();
- for (List<Scan> scans : qplan.getScans()) {
+ try {
+ for (final List<Scan> scans : qplan.getScans()) {
+ Future<List<InputSplit>> task = executorService.submit(
+ new Callable<List<InputSplit>>() {
+ @Override
+ public List<InputSplit> call() throws Exception {
+ return generateSplitsInternal(jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths);
+ }
+ });
+ tasks.add(task);
+ }
+ for (Future<List<InputSplit>> task : tasks) {
+ psplits.addAll(task.get());
+ }
+ } catch (ExecutionException | InterruptedException exception) {
+ throw new IOException("failed to get splits,reason:",
+ exception);
+ } finally {
+ executorService.shutdown();
+ }
+ }
+ return psplits;
+ }
+ /**
+ * This method is used to generate splits for each scan list.
+ * @param jobConf MapReduce Job Configuration
+ * @param qplan phoenix query plan
+ * @param splits phoenix table splits
+ * @param query phoenix query statement
+ * @param scans scan list slice of query plan
+ * @param splitByStats split by stat enabled
+ * @param tablePaths table paths
+ * @return List of Input Splits
+ * @throws IOException if function fails
+ */
+ private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
+ final QueryPlan qplan,
+ final List<KeyRange> splits,
+ final String query,
+ final List<Scan> scans,
+ final boolean splitByStats,
+ final Path[] tablePaths) throws IOException {
+
+ final List<InputSplit> psplits = new ArrayList<>(scans.size());
+ try (org.apache.hadoop.hbase.client.Connection connection =
+ ConnectionFactory.createConnection(
+ PhoenixConnectionUtil.getConfiguration(jobConf))) {
+ RegionLocator regionLocator =
+ connection.getRegionLocator(TableName.valueOf(
Review Comment:
Region locator also can be shared for each call.
##########
phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Properties;
+
+/**
+ * Test class for Hive PhoenixInputFormat
+ */
+@NotThreadSafe
+@Category(ParallelStatsDisabledTest.class)
+public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ HivePhoenixInputFormatTest.class);
+ private static final String TABLE_NAME = "HivePhoenixInputFormatTest"
+ .toUpperCase(Locale.ROOT);
+ private static final String DDL = "CREATE TABLE "
+ + TABLE_NAME
+ + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)";
+ private static final int SPLITS = 128;
+
+ /*
+ *
+ * This test will create phoenix table with 128 splits and compare
+ * performance of split generation in serial/parallel
+ *
+ * */
+ @Test
+ public void testGetSplitsSerialOrParallel() throws
IOException,SQLException {
+ PhoenixInputFormat<PhoenixRecordWritable> inputFormat =
+ new PhoenixInputFormat<PhoenixRecordWritable>();
+ long start,end;
+
+ // create table with N splits
+ System.out.println(
+ String.format("generate testing table with %s splits",
+ String.valueOf(SPLITS)));
+ setupTestTable();
+ // setup configuration required for PhoenixInputFormat
+ Configuration conf = getUtility().getConfiguration();
+ JobConf jobConf = new JobConf(conf);
+ configureTestInput(jobConf);
+
+
+ // test get splits in serial
+ start = System.currentTimeMillis();
+ jobConf.set("hive.phoenix.split.parallel.threshold","0");
+ InputSplit[] inputSplitsSerial = inputFormat.getSplits(jobConf,SPLITS);
+ end = System.currentTimeMillis();
+ long durationInSerial=end - start;
+ System.out.println(String.format(
+ "get split in serial requires:%s ms",
+ String.valueOf(durationInSerial)));
+
+ // test get splits in parallel
+ start = System.currentTimeMillis();
+ jobConf.set("hive.phoenix.split.parallel.threshold", "1");
+ InputSplit[] inputSplitsParallel = inputFormat.getSplits(
+ jobConf,
+ SPLITS);
+ end = System.currentTimeMillis();
+ long durationInParallel=end - start;
+
+ System.out.println(String.format(
+ "get split in parallel requires:%s ms",
+ String.valueOf(durationInParallel)));
+
+ // Test if performance of parallel method is better than serial method
+ Assert.assertTrue(durationInParallel < durationInSerial);
+ // Test if the input split returned by serial method and parallel
method are the same
+ Assert.assertTrue(inputSplitsParallel.length==SPLITS);
+ Assert.assertTrue(
+ inputSplitsParallel.length == inputSplitsSerial.length
+ );
+ for (final InputSplit inputSplitParallel:inputSplitsParallel){
+ boolean match=false;
+ for (final InputSplit inputSplitSerial:inputSplitsSerial){
+ if (inputSplitParallel.equals(inputSplitSerial)){
+ match=true;
+ break;
+ }
+ }
+ Assert.assertTrue(match);
+ }
+ }
+
+ private static void setupTestTable() throws SQLException {
+ final byte [] start=new byte[0];
+ final byte [] end = Bytes.createMaxByteArray(4);
+ final byte[][] splits = Bytes.split(start, end, SPLITS-2);
+ createTestTableWithBinarySplit(getUrl(),DDL, splits ,null);
+ }
+
+ private static void buildPreparedSqlWithBinarySplits(
+ StringBuffer sb,
+ int splits)
+ {
Review Comment:
code formatting required.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
+ jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths)
+ );
+ }
+ } else {
+ final int parallism = jobConf.getInt(
Review Comment:
Whats the difference between this parallelism level config and parallel
threshold.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
+ jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths)
+ );
+ }
+ } else {
+ final int parallism = jobConf.getInt(
+ "hive.phoenix.split.parallel.level",
+ Runtime.getRuntime().availableProcessors() * 2);
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ parallism);
+ LOG.info("generate splits in parallel with {} threads", parallism);
- // Adding Localization
- try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
{
- RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(qplan
- .getTableRef().getTable().getPhysicalName().toString()));
+ List<Future<List<InputSplit>>> tasks = new ArrayList<>();
- for (List<Scan> scans : qplan.getScans()) {
+ try {
+ for (final List<Scan> scans : qplan.getScans()) {
+ Future<List<InputSplit>> task = executorService.submit(
+ new Callable<List<InputSplit>>() {
+ @Override
+ public List<InputSplit> call() throws Exception {
+ return generateSplitsInternal(jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths);
+ }
+ });
+ tasks.add(task);
+ }
+ for (Future<List<InputSplit>> task : tasks) {
+ psplits.addAll(task.get());
+ }
+ } catch (ExecutionException | InterruptedException exception) {
+ throw new IOException("failed to get splits,reason:",
+ exception);
+ } finally {
+ executorService.shutdown();
+ }
+ }
+ return psplits;
+ }
+ /**
+ * This method is used to generate splits for each scan list.
+ * @param jobConf MapReduce Job Configuration
+ * @param qplan phoenix query plan
+ * @param splits phoenix table splits
+ * @param query phoenix query statement
+ * @param scans scan list slice of query plan
+ * @param splitByStats split by stat enabled
+ * @param tablePaths table paths
+ * @return List of Input Splits
+ * @throws IOException if function fails
+ */
+ private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
+ final QueryPlan qplan,
+ final List<KeyRange> splits,
+ final String query,
+ final List<Scan> scans,
+ final boolean splitByStats,
+ final Path[] tablePaths) throws IOException {
+
+ final List<InputSplit> psplits = new ArrayList<>(scans.size());
+ try (org.apache.hadoop.hbase.client.Connection connection =
+ ConnectionFactory.createConnection(
+ PhoenixConnectionUtil.getConfiguration(jobConf))) {
+ RegionLocator regionLocator =
+ connection.getRegionLocator(TableName.valueOf(
+ qplan.getTableRef().getTable()
+ .getPhysicalName().toString()));
PhoenixInputSplit inputSplit;
- HRegionLocation location =
regionLocator.getRegionLocation(scans.get(0).getStartRow()
- , false);
- long regionSize = CompatUtil.getSize(regionLocator,
connection.getAdmin(), location);
- String regionLocation =
PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
+ HRegionLocation location = regionLocator.getRegionLocation(
+ scans.get(0).getStartRow(),
+ false);
+ long regionSize = CompatUtil.getSize(regionLocator,
+ connection.getAdmin(),
+ location);
+ String regionLocation =
+ PhoenixStorageHandlerUtil.getRegionLocation(location,
+ LOG);
if (splitByStats) {
for (Scan aScan : scans) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Split for scan : " + aScan + "with
scanAttribute : " + aScan
- .getAttributesMap() + " [scanCache,
cacheBlock, scanBatch] : [" +
- aScan.getCaching() + ", " +
aScan.getCacheBlocks() + ", " + aScan
- .getBatch() + "] and regionLocation : " +
regionLocation);
+ LOG.debug("Split for scan : "
+ + aScan
+ + "with scanAttribute : "
+ + aScan.getAttributesMap()
+ + " [scanCache, cacheBlock, scanBatch] : ["
+ + aScan.getCaching()
+ + ", "
+ + aScan.getCacheBlocks()
+ + ", "
+ + aScan.getBatch()
+ + "] and regionLocation : "
+ + regionLocation);
}
- inputSplit = new PhoenixInputSplit(new
ArrayList<>(Arrays.asList(aScan)), tablePaths[0],
- regionLocation, regionSize);
+ inputSplit =
+ new PhoenixInputSplit(
+ new ArrayList<>(Arrays.asList(aScan)),
+ tablePaths[0],
+ regionLocation,
+ regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Scan count[" + scans.size() + "] : " +
Bytes.toStringBinary(scans
- .get(0).getStartRow()) + " ~ " +
Bytes.toStringBinary(scans.get(scans
- .size() - 1).getStopRow()));
- LOG.debug("First scan : " + scans.get(0) + "with
scanAttribute : " + scans
- .get(0).getAttributesMap() + " [scanCache,
cacheBlock, scanBatch] : " +
- "[" + scans.get(0).getCaching() + ", " +
scans.get(0).getCacheBlocks()
- + ", " + scans.get(0).getBatch() + "] and
regionLocation : " +
- regionLocation);
+ LOG.debug("Scan count["
Review Comment:
format properly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]