[
https://issues.apache.org/jira/browse/PHOENIX-6698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17542622#comment-17542622
]
ASF GitHub Bot commented on PHOENIX-6698:
-----------------------------------------
joshelser commented on code in PR #79:
URL: https://github.com/apache/phoenix-connectors/pull/79#discussion_r882833463
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -119,74 +124,144 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
}
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
- final List<KeyRange> splits,
String query) throws
- IOException {
- if (qplan == null){
+ final List<KeyRange> splits, final
String query)
+ throws IOException {
+
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshold = jobConf.getInt(
+
PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
+
PhoenixStorageHandlerConstants.DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD);
setScanCacheSize(jobConf);
+ try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory
+
.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
+ final RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(
+
qplan.getTableRef().getTable().getPhysicalName().toString()));
+ final int scanSize = qplan.getScans().size();
+ if (useParallelInputGeneration(parallelThreshold, scanSize)) {
+ final int parallism = jobConf.getInt(
Review Comment:
```suggestion
final int parallelism = jobConf.getInt(
```
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -119,74 +124,144 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
}
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
- final List<KeyRange> splits,
String query) throws
- IOException {
- if (qplan == null){
+ final List<KeyRange> splits, final
String query)
+ throws IOException {
+
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshold = jobConf.getInt(
+
PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
+
PhoenixStorageHandlerConstants.DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD);
setScanCacheSize(jobConf);
+ try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory
+
.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
+ final RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(
+
qplan.getTableRef().getTable().getPhysicalName().toString()));
+ final int scanSize = qplan.getScans().size();
+ if (useParallelInputGeneration(parallelThreshold, scanSize)) {
+ final int parallism = jobConf.getInt(
+
PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,
+ PhoenixStorageHandlerConstants
+
.DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT);
+ ExecutorService executorService =
Executors.newFixedThreadPool(parallism);
+ LOG.info("Generate Input Splits in Parallel with {} threads",
parallism);
- // Adding Localization
- try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
{
- RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(qplan
- .getTableRef().getTable().getPhysicalName().toString()));
-
- for (List<Scan> scans : qplan.getScans()) {
- PhoenixInputSplit inputSplit;
-
- HRegionLocation location =
regionLocator.getRegionLocation(scans.get(0).getStartRow()
- , false);
- long regionSize = CompatUtil.getSize(regionLocator,
connection.getAdmin(), location);
- String regionLocation =
PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
-
- if (splitByStats) {
- for (Scan aScan : scans) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Split for scan : " + aScan + "with
scanAttribute : " + aScan
- .getAttributesMap() + " [scanCache,
cacheBlock, scanBatch] : [" +
- aScan.getCaching() + ", " +
aScan.getCacheBlocks() + ", " + aScan
- .getBatch() + "] and regionLocation : " +
regionLocation);
- }
+ List<Future<List<InputSplit>>> tasks = new ArrayList<>();
- inputSplit = new PhoenixInputSplit(new
ArrayList<>(Arrays.asList(aScan)), tablePaths[0],
- regionLocation, regionSize);
- inputSplit.setQuery(query);
- psplits.add(inputSplit);
+ try {
+ for (final List<Scan> scans : qplan.getScans()) {
+ Future<List<InputSplit>> task = executorService.submit(
+ new Callable<List<InputSplit>>() {
+ @Override public List<InputSplit> call()
throws Exception {
+ return generateSplitsInternal(query,
scans, splitByStats,
+ connection, regionLocator,
tablePaths);
+ }
+ });
+ tasks.add(task);
+ }
+ for (Future<List<InputSplit>> task : tasks) {
+ psplits.addAll(task.get());
+ }
+ } catch (ExecutionException | InterruptedException exception) {
+ throw new IOException("Failed to Generate Input Splits in
Parallel, reason:",
+ exception);
Review Comment:
Good to unwrap the ExecutionException and throw back the real exception. It
may already be an IOException which you can throw with a cast, rather than
rewrapping in another IOException.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -119,74 +124,144 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
}
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
- final List<KeyRange> splits,
String query) throws
- IOException {
- if (qplan == null){
+ final List<KeyRange> splits, final
String query)
+ throws IOException {
+
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshold = jobConf.getInt(
+
PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
+
PhoenixStorageHandlerConstants.DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD);
setScanCacheSize(jobConf);
+ try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory
+
.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
+ final RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(
+
qplan.getTableRef().getTable().getPhysicalName().toString()));
+ final int scanSize = qplan.getScans().size();
+ if (useParallelInputGeneration(parallelThreshold, scanSize)) {
+ final int parallism = jobConf.getInt(
+
PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,
+ PhoenixStorageHandlerConstants
+
.DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT);
+ ExecutorService executorService =
Executors.newFixedThreadPool(parallism);
+ LOG.info("Generate Input Splits in Parallel with {} threads",
parallism);
Review Comment:
```suggestion
LOG.info("Generating Input Splits in Parallel with {}
threads", parallism);
```
##########
phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Properties;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * Test class for Hive PhoenixInputFormat
+ */
+@NotThreadSafe
+@Category(ParallelStatsDisabledTest.class)
+public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT {
+ private static final Logger LOG =
LoggerFactory.getLogger(HivePhoenixInputFormatTest.class);
+ private static final String TABLE_NAME =
"HivePhoenixInputFormatTest".toUpperCase(Locale.ROOT);
+ private static final String DDL = "CREATE TABLE " + TABLE_NAME
+ + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)";
+ private static final int SPLITS = 256;
+
+ // This test will create phoenix table with 128 splits and compare
performance of
+ // serial split-generation method and parallel split-generation method.
+ @Test
+ public void testGetSplitsSerialOrParallel() throws IOException,
SQLException {
+ PhoenixInputFormat<PhoenixRecordWritable> inputFormat =
+ new PhoenixInputFormat<PhoenixRecordWritable>();
+ long start;
+ long end;
+ // create table with N splits
+ System.out.println(
+ String.format("generate testing table with %s splits",
String.valueOf(SPLITS)));
+ setupTestTable();
+ // setup configuration required for PhoenixInputFormat
+ Configuration conf = getUtility().getConfiguration();
+ JobConf jobConf = new JobConf(conf);
+ configureTestInput(jobConf);
+ inputFormat.getSplits(jobConf, SPLITS);
+ InputSplit[] inputSplitsSerial;
+ // test get splits in serial
+ start = System.currentTimeMillis();
+
jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
"0");
+ inputSplitsSerial = inputFormat.getSplits(jobConf, SPLITS);
+ end = System.currentTimeMillis();
+ long durationInSerial = end - start;
+ System.out.println(String.format("get split in serial requires:%s ms",
+ String.valueOf(durationInSerial)));
+
+ // test get splits in parallel
+ start = System.currentTimeMillis();
+
jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
"1");
+
jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,"24");
+ InputSplit[] inputSplitsParallel = inputFormat.getSplits(jobConf,
SPLITS);
+ end = System.currentTimeMillis();
+ long durationInParallel = end - start;
+
+ System.out.println(String.format("get split in parallel requires:%s
ms",
+ String.valueOf(durationInParallel)));
+
+ // Test if performance of parallel method is better than serial method
+ Assert.assertTrue(durationInParallel < durationInSerial);
Review Comment:
This will result in flaky tests as the environments which will run this test
are guaranteed to not be deterministic. Unit tests should be about functional
correctness, not performance.
##########
phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Properties;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * Test class for Hive PhoenixInputFormat
+ */
+@NotThreadSafe
+@Category(ParallelStatsDisabledTest.class)
+public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT {
Review Comment:
Do the existing Phoenix-hive tests activate your new property and implicitly
validate that it is functional? I think we have some test classes but do we
create multi-region Phoenix tables in those tests (or those with enough data to
have multiple guideposts)?
> hive-connector will take long time to generate splits for large phoenix
> tables.
> -------------------------------------------------------------------------------
>
> Key: PHOENIX-6698
> URL: https://issues.apache.org/jira/browse/PHOENIX-6698
> Project: Phoenix
> Issue Type: Improvement
> Components: hive-connector
> Affects Versions: 5.1.0
> Reporter: jichen
> Assignee: jichen
> Priority: Minor
> Fix For: connectors-6.0.0
>
> Attachments: PHOENIX-6698.master.v1.patch
>
>
> {{{color:#1d1c1d}In our production environment, hive-phoenix connector will
> take nearly 30-40 minutes to generate splits for large phoenix table, which
> has more than 2048 regions.it is because in class PhoenixInputFormat,
> function 'generateSplits' only uses one thread to generate splits for each
> scan. My proposal is to use multi-thread to generate splits in parallel. the
> proposal has been validated in our production environment.by changing code
> {color}}}{color:#1d1c1d}to generate splits in parallel with 24 threads, the
> time cost is reduced to 2 minutes. {color}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)