This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 09ec2ae [FLINK-20486][hive] Hive temporal join shouldn't require a
minimal 1h monitor interval
09ec2ae is described below
commit 09ec2ae626acac0c17374a77d49f290ae5289f7d
Author: Leonard Xu <[email protected]>
AuthorDate: Mon Dec 7 20:38:32 2020 +0800
[FLINK-20486][hive] Hive temporal join shouldn't require a minimal 1h
monitor interval
This closes #14310
---
.../connectors/hive/HiveLookupTableSource.java | 23 +++++++++-------
.../hive/HiveDynamicTableFactoryTest.java | 31 ++++++++++------------
2 files changed, 27 insertions(+), 27 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
index f6c3e2b..0d2bf1e 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
@@ -67,6 +69,7 @@ import static
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOUR
*/
public class HiveLookupTableSource extends HiveTableSource implements
LookupTableSource {
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveLookupTableSource.class);
private static final Duration DEFAULT_LOOKUP_MONITOR_INTERVAL =
Duration.ofHours(1L);
private final Configuration configuration;
private Duration hiveTableReloadInterval;
@@ -114,16 +117,16 @@ public class HiveLookupTableSource extends
HiveTableSource implements LookupTabl
Duration monitorInterval =
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
? DEFAULT_LOOKUP_MONITOR_INTERVAL
:
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
- Preconditions.checkArgument(
- monitorInterval.toMillis() >=
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
- String.format(
- "Currently the value of
'%s' is required bigger or equal to default value '%s' " +
- "when
set '%s' to 'latest', but actual is '%s'",
-
STREAMING_SOURCE_MONITOR_INTERVAL.key(),
-
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
-
STREAMING_SOURCE_PARTITION_INCLUDE.key(),
-
monitorInterval.toMillis())
- );
+
+ if (monitorInterval.toMillis() <
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis()) {
+ LOG.warn(String.format(
+ "Currently the recommended value of
'%s' is at least '%s' when set '%s' to 'latest'," +
+ " but actual is '%s', this may
produce big pressure to hive metastore.",
+ STREAMING_SOURCE_MONITOR_INTERVAL.key(),
+
DEFAULT_LOOKUP_MONITOR_INTERVAL.toMillis(),
+
STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+ monitorInterval.toMillis()));
+ }
hiveTableReloadInterval = monitorInterval;
} else {
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
index 1edd19f..3baa533 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDynamicTableFactoryTest.java
@@ -188,26 +188,23 @@ public class HiveDynamicTableFactoryTest {
assertEquals(configuration1.get(STREAMING_SOURCE_PARTITION_ORDER),
"partition-time");
assertEquals(configuration1.get(PARTITION_TIME_EXTRACTOR_KIND),
"custom");
assertEquals(configuration1.get(PARTITION_TIME_EXTRACTOR_CLASS),
"path.to..TimeExtractor");
- }
- @Test
- public void testInvalidOptions() {
tableEnv.executeSql(String.format(
- "create table table8 (x int, y string, z int)" +
- " tblproperties ('%s' = 'true',
'%s' = 'latest', '%s' = '5min')",
- STREAMING_SOURCE_ENABLE.key(),
- STREAMING_SOURCE_PARTITION_INCLUDE.key(),
- STREAMING_SOURCE_MONITOR_INTERVAL.key()));
- try {
- getTableSource("table8");
- } catch (Throwable t) {
- assertTrue(ExceptionUtils.findThrowableWithMessage(t,
- "Currently the value of
'streaming-source.monitor-interval' is required bigger or " +
- "equal to default value
'3600000' when set 'streaming-source.partition.include' to 'latest'," +
- " but actual is
'300000'")
- .isPresent());
- }
+ "create table table8 (x int, y string, z int)" +
+ " tblproperties ('%s' = 'true', '%s' =
'latest', '%s' = '5min')",
+ STREAMING_SOURCE_ENABLE.key(),
+ STREAMING_SOURCE_PARTITION_INCLUDE.key(),
+ STREAMING_SOURCE_MONITOR_INTERVAL.key()));
+ DynamicTableSource tableSource4 = getTableSource("table8");
+ assertTrue(tableSource4 instanceof HiveLookupTableSource);
+ HiveLookupTableSource lookupTableSource4 =
(HiveLookupTableSource) tableSource4;
+ Configuration configuration4 = new Configuration();
+
lookupTableSource4.catalogTable.getOptions().forEach(configuration4::setString);
+
assertEquals(configuration4.get(STREAMING_SOURCE_MONITOR_INTERVAL),
Duration.ofMinutes(5L));
+ }
+ @Test
+ public void testInvalidOptions() throws Exception {
tableEnv.executeSql(String.format(
"create table table9 (x int, y string, z int)" +
" tblproperties ('%s' = 'true',
'%s' = 'latest', '%s' = '120min', '%s' = '1970-00-01')",