lirui-apache commented on a change in pull request #17245:
URL: https://github.com/apache/flink/pull/17245#discussion_r715418958



##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
##########
@@ -441,6 +470,93 @@ private void checkSuccessFiles(String path) {
         Assert.assertTrue(new File(new File(basePath, "e=11"), 
"_MY_SUCCESS").exists());
     }
 
+    private void testStreamingWriteWithCustomPartitionCommitPolicy(
+            String customPartitionCommitPolicyClassName) throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        // avoid the job to restart infinitely
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1_000));
+
+        StreamTableEnvironment tEnv = 
HiveTestUtils.createTableEnvInStreamingMode(env);
+        tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tEnv.useCatalog(hiveCatalog.getName());
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+
+        try {
+            tEnv.executeSql("create database db1");
+            tEnv.useDatabase("db1");
+
+            // prepare source
+            List<Row> data =
+                    Arrays.asList(
+                            Row.of(1, "a", "b", "2020-05-03", "7"),
+                            Row.of(2, "p", "q", "2020-05-03", "8"),
+                            Row.of(3, "x", "y", "2020-05-03", "9"),
+                            Row.of(4, "x", "y", "2020-05-03", "10"),
+                            Row.of(5, "x", "y", "2020-05-03", "11"));
+            DataStream<Row> stream =
+                    env.addSource(
+                            new FiniteTestSource<>(data),
+                            new RowTypeInfo(
+                                    Types.INT,
+                                    Types.STRING,
+                                    Types.STRING,
+                                    Types.STRING,
+                                    Types.STRING));
+            tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), 
$("c"), $("d"), $("e"));
+
+            // DDL
+            tEnv.executeSql(
+                    "create external table sink_table (a int,b string,c string"
+                            + ") "
+                            + "partitioned by (d string,e string) "
+                            + " stored as textfile"
+                            + " TBLPROPERTIES ("
+                            + "'"
+                            + PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()

Review comment:
       We're using the default `process-time` commit trigger, why do we need to 
set a partition time extractor?

##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
##########
@@ -441,6 +470,93 @@ private void checkSuccessFiles(String path) {
         Assert.assertTrue(new File(new File(basePath, "e=11"), 
"_MY_SUCCESS").exists());
     }
 
+    private void testStreamingWriteWithCustomPartitionCommitPolicy(
+            String customPartitionCommitPolicyClassName) throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        // avoid the job to restart infinitely
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1_000));
+
+        StreamTableEnvironment tEnv = 
HiveTestUtils.createTableEnvInStreamingMode(env);
+        tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tEnv.useCatalog(hiveCatalog.getName());
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+
+        try {
+            tEnv.executeSql("create database db1");
+            tEnv.useDatabase("db1");
+
+            // prepare source
+            List<Row> data =
+                    Arrays.asList(
+                            Row.of(1, "a", "b", "2020-05-03", "7"),
+                            Row.of(2, "p", "q", "2020-05-03", "8"),
+                            Row.of(3, "x", "y", "2020-05-03", "9"),
+                            Row.of(4, "x", "y", "2020-05-03", "10"),
+                            Row.of(5, "x", "y", "2020-05-03", "11"));
+            DataStream<Row> stream =
+                    env.addSource(
+                            new FiniteTestSource<>(data),
+                            new RowTypeInfo(
+                                    Types.INT,
+                                    Types.STRING,
+                                    Types.STRING,
+                                    Types.STRING,
+                                    Types.STRING));
+            tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), 
$("c"), $("d"), $("e"));
+
+            // DDL
+            tEnv.executeSql(
+                    "create external table sink_table (a int,b string,c string"
+                            + ") "
+                            + "partitioned by (d string,e string) "
+                            + " stored as textfile"
+                            + " TBLPROPERTIES ("
+                            + "'"
+                            + PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()
+                            + "'='$d $e:00:00',"
+                            + "'"
+                            + SINK_PARTITION_COMMIT_DELAY.key()
+                            + "'='1h',"
+                            + "'"
+                            + SINK_PARTITION_COMMIT_POLICY_KIND.key()
+                            + "'='metastore,custom',"
+                            + "'"
+                            + SINK_PARTITION_COMMIT_POLICY_CLASS.key()
+                            + "'='"
+                            + customPartitionCommitPolicyClassName
+                            + "'"
+                            + ")");
+
+            // hive dialect only works with hive tables at the moment, switch 
to default dialect
+            tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+            tEnv.sqlQuery("select * from 
my_table").executeInsert("sink_table").await();
+
+            // check committed partitions for CustomizedCommitPolicy
+            Set<String> committedPaths =
+                    
TestCustomCommitPolicy.getCommittedPartitionPathsAndReset();
+            String base =
+                    URI.create(
+                                    hiveCatalog
+                                            
.getHiveTable(ObjectPath.fromString("db1.sink_table"))
+                                            .getSd()
+                                            .getLocation())
+                            .getPath();
+            List<String> partitionKVs = Lists.newArrayList("e=7", "e=8", 
"e=9", "e=10", "e=11");
+            partitionKVs.forEach(
+                    partitionKV -> {
+                        String partitionPath =
+                                new Path(new Path(base, "d=2020-05-03"), 
partitionKV).toString();
+                        Assert.assertTrue(
+                                String.join("", committedPaths),

Review comment:
       I don't think the error message is helpful, or even readable.

##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
##########
@@ -441,6 +470,93 @@ private void checkSuccessFiles(String path) {
         Assert.assertTrue(new File(new File(basePath, "e=11"), 
"_MY_SUCCESS").exists());
     }
 
+    private void testStreamingWriteWithCustomPartitionCommitPolicy(
+            String customPartitionCommitPolicyClassName) throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100);
+        // avoid the job to restart infinitely
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1_000));
+
+        StreamTableEnvironment tEnv = 
HiveTestUtils.createTableEnvInStreamingMode(env);
+        tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tEnv.useCatalog(hiveCatalog.getName());
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+
+        try {
+            tEnv.executeSql("create database db1");
+            tEnv.useDatabase("db1");
+
+            // prepare source
+            List<Row> data =
+                    Arrays.asList(
+                            Row.of(1, "a", "b", "2020-05-03", "7"),
+                            Row.of(2, "p", "q", "2020-05-03", "8"),
+                            Row.of(3, "x", "y", "2020-05-03", "9"),
+                            Row.of(4, "x", "y", "2020-05-03", "10"),
+                            Row.of(5, "x", "y", "2020-05-03", "11"));
+            DataStream<Row> stream =
+                    env.addSource(
+                            new FiniteTestSource<>(data),
+                            new RowTypeInfo(
+                                    Types.INT,
+                                    Types.STRING,
+                                    Types.STRING,
+                                    Types.STRING,
+                                    Types.STRING));
+            tEnv.createTemporaryView("my_table", stream, $("a"), $("b"), 
$("c"), $("d"), $("e"));
+
+            // DDL
+            tEnv.executeSql(
+                    "create external table sink_table (a int,b string,c string"
+                            + ") "
+                            + "partitioned by (d string,e string) "
+                            + " stored as textfile"
+                            + " TBLPROPERTIES ("
+                            + "'"
+                            + PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()
+                            + "'='$d $e:00:00',"
+                            + "'"
+                            + SINK_PARTITION_COMMIT_DELAY.key()
+                            + "'='1h',"
+                            + "'"
+                            + SINK_PARTITION_COMMIT_POLICY_KIND.key()
+                            + "'='metastore,custom',"
+                            + "'"
+                            + SINK_PARTITION_COMMIT_POLICY_CLASS.key()
+                            + "'='"
+                            + customPartitionCommitPolicyClassName
+                            + "'"
+                            + ")");
+
+            // hive dialect only works with hive tables at the moment, switch 
to default dialect
+            tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+            tEnv.sqlQuery("select * from 
my_table").executeInsert("sink_table").await();
+
+            // check committed partitions for CustomizedCommitPolicy
+            Set<String> committedPaths =
+                    
TestCustomCommitPolicy.getCommittedPartitionPathsAndReset();
+            String base =
+                    URI.create(
+                                    hiveCatalog
+                                            
.getHiveTable(ObjectPath.fromString("db1.sink_table"))
+                                            .getSd()
+                                            .getLocation())
+                            .getPath();
+            List<String> partitionKVs = Lists.newArrayList("e=7", "e=8", 
"e=9", "e=10", "e=11");
+            partitionKVs.forEach(
+                    partitionKV -> {
+                        String partitionPath =
+                                new Path(new Path(base, "d=2020-05-03"), 
partitionKV).toString();
+                        Assert.assertTrue(
+                                String.join("", committedPaths),
+                                committedPaths.contains(partitionPath));
+                    });
+        } finally {
+            tEnv.executeSql("drop database db1 cascade");

Review comment:
       ```suggestion
               tEnv.executeSql("drop database if exists db1 cascade");
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to