This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 09ec2ae  [FLINK-20486][hive] Hive temporal join shouldn't require a 
minimal 1h monitor interval
09ec2ae is described below

commit 09ec2ae626acac0c17374a77d49f290ae5289f7d
Author: Leonard Xu <[email protected]>
AuthorDate: Mon Dec 7 20:38:32 2020 +0800

    [FLINK-20486][hive] Hive temporal join shouldn't require a minimal 1h 
monitor interval
    
    This closes #14310
---
 .../connectors/hive/HiveLookupTableSource.java     | 23 +++++++++-------
 .../hive/HiveDynamicTableFactoryTest.java          | 31 ++++++++++------------
 2 files changed, 27 insertions(+), 27 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
index f6c3e2b..0d2bf1e 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -67,6 +69,7 @@ import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOUR
  */
 public class HiveLookupTableSource extends HiveTableSource implements 
LookupTableSource {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(HiveLookupTableSource.class);
        private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL = 
Duration.ofHours(1L);
        private final Configuration configuration;
        private Duration hiveTableReloadInterval;
@@ -114,16 +117,16 @@ public class HiveLookupTableSource extends 
HiveTableSource implements LookupTabl
                        Duration monitorInterval = 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
                                        ? DEFAULT_LOOKUP_MONITOR_INTERVAL
                                        : 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
-                       Preconditions.checkArgument(
-                                       monitorInterval.toMillis() >= 
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
-                                       String.format(
-                                                       "Currently the value of 
'%s' is required bigger or equal to default value '%s' " +
-                                                                       "when 
set '%s' to 'latest', but actual is '%s'",
-                                                       
STREAMING_SOURCE_MONITOR_INTERVAL.key(),
-                                                       
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
-                                                       
STREAMING_SOURCE_PARTITION_INCLUDE.key(),
-                                                       
monitorInterval.toMillis())
-                       );
+
+                       if (monitorInterval.toMillis() < 
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis()) {
+                               LOG.warn(String.format(
+                                       "Currently the recommended value of 
'%s' is at least '%s' when set '%s' to 'latest'," +
+                                               " but actual is '%s', this may 
produce big pressure to hive metastore.",
+                                       STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+                                       
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+                                       
STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+                                       monitorInterval.toMillis()));
+                       }
 
                        hiveTableReloadInterval = monitorInterval;
                } else {
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
index 1edd19f..3baa533 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
@@ -188,26 +188,23 @@ public class HiveDynamicTableFactoryTest {
                
assertEquals(configuration1.get(STREAMING_SOURCE_PARTITION_ORDER), 
"partition-time");
                assertEquals(configuration1.get(PARTITION_TIME_EXTRACTOR_KIND), 
"custom");
                
assertEquals(configuration1.get(PARTITION_TIME_EXTRACTOR_CLASS), 
"path.to..TimeExtractor");
-       }
 
-       @Test
-       public void testInvalidOptions() {
                tableEnv.executeSql(String.format(
-                               "create table table8 (x int, y string, z int)" +
-                                               " tblproperties ('%s' = 'true', 
'%s' = 'latest', '%s' = '5min')",
-                               STREAMING_SOURCE_ENABLE.key(),
-                               STREAMING_SOURCE_PARTITION_INCLUDE.key(),
-                               STREAMING_SOURCE_MONITOR_INTERVAL.key()));
-               try {
-                       getTableSource("table8");
-               } catch (Throwable t) {
-                       assertTrue(ExceptionUtils.findThrowableWithMessage(t,
-                                       "Currently the value of 
'streaming-source.monitor-interval' is required bigger or " +
-                                                       "equal to default value 
'3600000' when set 'streaming-source.partition.include' to 'latest'," +
-                                                       " but actual is 
'300000'")
-                                       .isPresent());
-               }
+                       "create table table8 (x int, y string, z int)" +
+                               " tblproperties ('%s' = 'true', '%s' = 
'latest', '%s' = '5min')",
+                       STREAMING_SOURCE_ENABLE.key(),
+                       STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+                       STREAMING_SOURCE_MONITOR_INTERVAL.key()));
+               DynamicTableSource tableSource4 = getTableSource("table8");
+               assertTrue(tableSource4 instanceof HiveLookupTableSource);
+               HiveLookupTableSource lookupTableSource4 = 
(HiveLookupTableSource) tableSource4;
+               Configuration configuration4 = new Configuration();
+               
lookupTableSource4.catalogTable.getOptions().forEach(configuration4::setString);
+               
assertEquals(configuration4.get(STREAMING_SOURCE_MONITOR_INTERVAL), 
Duration.ofMinutes(5L));
+       }
 
+       @Test
+       public void testInvalidOptions() throws Exception {
                tableEnv.executeSql(String.format(
                                "create table table9 (x int, y string, z int)" +
                                                " tblproperties ('%s' = 'true', 
'%s' = 'latest', '%s' = '120min', '%s' = '1970-00-01')",

Reply via email to