This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new c4245bdc [FLINK-16413][hive] Reduce hive source parallelism when limit 
push down
c4245bdc is described below

commit c4245bdcd2e9ba48cdab9760d8120df60aa0e2d5
Author: JunZhang <[email protected]>
AuthorDate: Wed Mar 18 16:53:06 2020 +0800

    [FLINK-16413][hive] Reduce hive source parallelism when limit push down
    
    
    This closes #11429
---
 .../flink/connectors/hive/HiveTableSource.java     |  7 ++++-
 .../flink/connectors/hive/HiveTableSourceTest.java | 34 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 5d654cf..98691b9 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -138,6 +139,7 @@ public class HiveTableSource implements
                HiveTableInputFormat inputFormat = 
getInputFormat(allHivePartitions, 
conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
                DataStreamSource<BaseRow> source = 
execEnv.createInput(inputFormat, typeInfo);
 
+               int parallelism = 
conf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
                if 
(conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)) {
                        int max = 
conf.getInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
                        if (max < 1) {
@@ -158,8 +160,11 @@ public class HiveTableSource implements
                        } catch (IOException e) {
                                throw new FlinkHiveException(e);
                        }
-                       source.setParallelism(Math.min(Math.max(1, splitNum), 
max));
+                       parallelism = Math.min(splitNum, max);
                }
+               parallelism = limit > 0 ? Math.min(parallelism, (int) limit / 
1000) : parallelism;
+               parallelism = Math.max(1, parallelism);
+               source.setParallelism(parallelism);
                return source.name(explainSource());
        }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 18cdea2..c6a1043 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -27,10 +27,13 @@ import 
org.apache.flink.connectors.hive.read.HiveTableInputFormat;
 import org.apache.flink.connectors.hive.read.HiveTableInputSplit;
 import org.apache.flink.connectors.hive.read.HiveVectorizedOrcSplitReader;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.HiveVersionTestUtil;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableUtils;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -414,6 +417,37 @@ public class HiveTableSourceTest {
        }
 
        @Test
+       public void testParallelismOnLimitPushDown() {
+               final String catalogName = "hive";
+               final String dbName = "source_db";
+               final String tblName = "test_parallelism_limit_pushdown";
+               hiveShell.execute("CREATE TABLE 
source_db.test_parallelism_limit_pushdown " +
+                                       "(year STRING, value INT) partitioned 
by (pt int);");
+               HiveTestUtils.createTextTableInserter(hiveShell, dbName, 
tblName)
+                                       .addRow(new Object[]{"2014", 3})
+                                       .addRow(new Object[]{"2014", 4})
+                                       .commit("pt=0");
+               HiveTestUtils.createTextTableInserter(hiveShell, dbName, 
tblName)
+                                       .addRow(new Object[]{"2015", 2})
+                                       .addRow(new Object[]{"2015", 5})
+                                       .commit("pt=1");
+               TableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+               tEnv.getConfig().getConfiguration().setBoolean(
+                       HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, 
false);
+               tEnv.getConfig().getConfiguration().setInteger(
+                       
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+               tEnv.registerCatalog(catalogName, hiveCatalog);
+               Table table = tEnv.sqlQuery("select * from 
hive.source_db.test_parallelism_limit_pushdown limit 1");
+               PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) 
tEnv).getPlanner();
+               RelNode relNode = 
planner.optimize(TableTestUtil.toRelNode(table));
+               ExecNode execNode = 
planner.translateToExecNodePlan(toScala(Collections.singletonList(relNode))).get(0);
+               @SuppressWarnings("unchecked")
+               Transformation transformation = 
execNode.translateToPlan(planner);
+               Assert.assertEquals(1, ((PartitionTransformation) 
((OneInputTransformation) transformation).getInput())
+                       .getInput().getParallelism());
+       }
+
+       @Test
        public void testVectorReaderSwitch() throws Exception {
                // vector reader not available for 1.x and we're not testing 
orc for 2.0.x
                Assume.assumeTrue(HiveVersionTestUtil.HIVE_210_OR_LATER);

Reply via email to