This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new c4245bdc [FLINK-16413][hive] Reduce hive source parallelism when limit
push down
c4245bdc is described below
commit c4245bdcd2e9ba48cdab9760d8120df60aa0e2d5
Author: JunZhang <[email protected]>
AuthorDate: Wed Mar 18 16:53:06 2020 +0800
[FLINK-16413][hive] Reduce hive source parallelism when limit push down
This closes #11429
---
.../flink/connectors/hive/HiveTableSource.java | 7 ++++-
.../flink/connectors/hive/HiveTableSourceTest.java | 34 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 5d654cf..98691b9 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -138,6 +139,7 @@ public class HiveTableSource implements
HiveTableInputFormat inputFormat =
getInputFormat(allHivePartitions,
conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
DataStreamSource<BaseRow> source =
execEnv.createInput(inputFormat, typeInfo);
+ int parallelism =
conf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
if
(conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)) {
int max =
conf.getInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
if (max < 1) {
@@ -158,8 +160,11 @@ public class HiveTableSource implements
} catch (IOException e) {
throw new FlinkHiveException(e);
}
- source.setParallelism(Math.min(Math.max(1, splitNum),
max));
+ parallelism = Math.min(splitNum, max);
}
+ parallelism = limit > 0 ? Math.min(parallelism, (int) limit /
1000) : parallelism;
+ parallelism = Math.max(1, parallelism);
+ source.setParallelism(parallelism);
return source.name(explainSource());
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 18cdea2..c6a1043 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -27,10 +27,13 @@ import
org.apache.flink.connectors.hive.read.HiveTableInputFormat;
import org.apache.flink.connectors.hive.read.HiveTableInputSplit;
import org.apache.flink.connectors.hive.read.HiveVectorizedOrcSplitReader;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableUtils;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
@@ -414,6 +417,37 @@ public class HiveTableSourceTest {
}
@Test
+ public void testParallelismOnLimitPushDown() {
+ final String catalogName = "hive";
+ final String dbName = "source_db";
+ final String tblName = "test_parallelism_limit_pushdown";
+ hiveShell.execute("CREATE TABLE
source_db.test_parallelism_limit_pushdown " +
+ "(year STRING, value INT) partitioned
by (pt int);");
+ HiveTestUtils.createTextTableInserter(hiveShell, dbName,
tblName)
+ .addRow(new Object[]{"2014", 3})
+ .addRow(new Object[]{"2014", 4})
+ .commit("pt=0");
+ HiveTestUtils.createTextTableInserter(hiveShell, dbName,
tblName)
+ .addRow(new Object[]{"2015", 2})
+ .addRow(new Object[]{"2015", 5})
+ .commit("pt=1");
+ TableEnvironment tEnv =
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+ tEnv.getConfig().getConfiguration().setBoolean(
+ HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM,
false);
+ tEnv.getConfig().getConfiguration().setInteger(
+
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ Table table = tEnv.sqlQuery("select * from
hive.source_db.test_parallelism_limit_pushdown limit 1");
+ PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl)
tEnv).getPlanner();
+ RelNode relNode =
planner.optimize(TableTestUtil.toRelNode(table));
+ ExecNode execNode =
planner.translateToExecNodePlan(toScala(Collections.singletonList(relNode))).get(0);
+ @SuppressWarnings("unchecked")
+ Transformation transformation =
execNode.translateToPlan(planner);
+ Assert.assertEquals(1, ((PartitionTransformation)
((OneInputTransformation) transformation).getInput())
+ .getInput().getParallelism());
+ }
+
+ @Test
public void testVectorReaderSwitch() throws Exception {
// vector reader not available for 1.x and we're not testing
orc for 2.0.x
Assume.assumeTrue(HiveVersionTestUtil.HIVE_210_OR_LATER);