PatrickRen commented on code in PR #20382:
URL: https://github.com/apache/flink/pull/20382#discussion_r936217253


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/PeriodicCacheReloadTrigger.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.PERIODIC;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A trigger that reloads cache entries periodically with specified interval 
and {@link
+ * ScheduleMode}. If {@code reloadInterval} is zero, cache loading will happen 
just one time.
+ */
+public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {
+    private static final long serialVersionUID = 3828732577291369913L;

Review Comment:
   The serial version UID should start from 1 for new classes



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/PeriodicCacheReloadTrigger.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.PERIODIC;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A trigger that reloads cache entries periodically with specified interval 
and {@link
+ * ScheduleMode}. If {@code reloadInterval} is zero, cache loading will happen 
just one time.
+ */
+public class PeriodicCacheReloadTrigger implements CacheReloadTrigger {
+    private static final long serialVersionUID = 3828732577291369913L;
+
+    private final Duration reloadInterval;
+    private final ScheduleMode scheduleMode;
+
+    private transient ScheduledExecutorService scheduledExecutor;
+
+    public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode 
scheduleMode) {
+        checkArgument(!reloadInterval.isNegative(), "Reload interval can't be 
negative.");
+        this.reloadInterval = reloadInterval;
+        this.scheduleMode = scheduleMode;
+    }
+
+    @VisibleForTesting
+    PeriodicCacheReloadTrigger(
+            Duration reloadInterval,
+            ScheduleMode scheduleMode,
+            ScheduledExecutorService scheduledExecutor) {
+        this(reloadInterval, scheduleMode);
+        this.scheduledExecutor = scheduledExecutor;
+    }
+
+    @Override
+    public void open(CacheReloadTrigger.Context context) {
+        if (scheduledExecutor == null) {
+            scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+        }
+        if (reloadInterval.isZero()) {
+            scheduledExecutor.execute(context::triggerReload);
+            return;
+        }
+        switch (scheduleMode) {
+            case FIXED_RATE:
+                scheduledExecutor.scheduleAtFixedRate(
+                        context::triggerReload,
+                        0,
+                        reloadInterval.toMillis(),
+                        TimeUnit.MILLISECONDS);
+                break;
+            case FIXED_DELAY:
+                scheduledExecutor.scheduleWithFixedDelay(
+                        () -> {
+                            try {
+                                context.triggerReload().get();
+                            } catch (Exception e) {
+                                throw new RuntimeException(
+                                        "Uncaught exception during the 
reload", e);
+                            }
+                        },
+                        0,
+                        reloadInterval.toMillis(),
+                        TimeUnit.MILLISECONDS);
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unrecognized schedule mode \"%s\"", 
scheduleMode));
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        scheduledExecutor.shutdownNow();

Review Comment:
   What about wrapping with a `if(scheduledExecutor != null)`?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/TimedCacheReloadTrigger.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.LocalTime;
+import java.time.OffsetTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.Temporal;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.TIMED;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A trigger that reloads at a specific time and repeats for the given 
interval in days. */
+public class TimedCacheReloadTrigger implements CacheReloadTrigger {
+    private static final long serialVersionUID = -8565574480311775185L;

Review Comment:
   The serial version UID should start from 1 for new classes



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/TimedCacheReloadTrigger.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup.cache.trigger;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.LocalTime;
+import java.time.OffsetTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.Temporal;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL;
+import static 
org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.TIMED;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A trigger that reloads at a specific time and repeats for the given 
interval in days. */
+public class TimedCacheReloadTrigger implements CacheReloadTrigger {
+    private static final long serialVersionUID = -8565574480311775185L;
+
+    private final Temporal reloadTime;
+    private final int reloadIntervalInDays;
+
+    private transient ScheduledExecutorService scheduledExecutor;
+    private transient Clock clock; // clock for testing purposes
+
+    public TimedCacheReloadTrigger(OffsetTime reloadTime, int 
reloadIntervalInDays) {
+        this((Temporal) reloadTime, reloadIntervalInDays);
+    }
+
+    public TimedCacheReloadTrigger(LocalTime reloadTime, int 
reloadIntervalInDays) {
+        this((Temporal) reloadTime, reloadIntervalInDays);
+    }
+
+    private TimedCacheReloadTrigger(Temporal reloadTime, int 
reloadIntervalInDays) {
+        checkArgument(
+                reloadIntervalInDays > 0,
+                "Reload interval for Timed cache reload trigger must be at 
least 1 day.");
+        this.reloadTime = reloadTime;
+        this.reloadIntervalInDays = reloadIntervalInDays;
+    }
+
+    @VisibleForTesting
+    TimedCacheReloadTrigger(
+            Temporal reloadTime,
+            int reloadIntervalInDays,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock) {
+        this(reloadTime, reloadIntervalInDays);
+        this.scheduledExecutor = scheduledExecutor;
+        this.clock = clock;
+    }
+
+    @Override
+    public void open(Context context) {
+        if (scheduledExecutor == null) {
+            scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+        }
+        if (clock == null) {
+            clock =
+                    reloadTime instanceof LocalTime
+                            ? Clock.systemDefaultZone()
+                            : Clock.system(((OffsetTime) 
reloadTime).getOffset());
+        }
+        Temporal now =
+                reloadTime instanceof LocalTime ? LocalTime.now(clock) : 
OffsetTime.now(clock);
+
+        Duration initialDelayDuration = Duration.between(now, reloadTime);
+        if (initialDelayDuration.isNegative()) {
+            // in case when reloadTime is less than current time, reload will 
happen next day
+            initialDelayDuration = initialDelayDuration.plus(1, 
ChronoUnit.DAYS);
+        }
+        scheduledExecutor.execute(context::triggerReload); // trigger first 
load operation
+        scheduledExecutor.scheduleAtFixedRate(
+                context::triggerReload,
+                initialDelayDuration.toMillis(),
+                Duration.ofDays(reloadIntervalInDays).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() throws Exception {
+        scheduledExecutor.shutdownNow();

Review Comment:
   What about wrapping with a if(scheduledExecutor != null)?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/LookupOptions.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.lookup;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import 
org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
+import 
org.apache.flink.table.connector.source.lookup.cache.trigger.PeriodicCacheReloadTrigger;
+
+import java.time.Duration;
+
+/** Predefined options for lookup join. */
+public class LookupOptions {
+    public static final ConfigOption<LookupCacheType> CACHE_TYPE =
+            ConfigOptions.key("lookup.cache")
+                    .enumType(LookupCacheType.class)
+                    .defaultValue(LookupCacheType.NONE)
+                    .withDescription("The caching strategy for this lookup 
table");
+
+    public static final ConfigOption<ReloadStrategy> 
FULL_CACHE_RELOAD_STRATEGY =
+            ConfigOptions.key("lookup.full-cache.reload-strategy")
+                    .enumType(ReloadStrategy.class)
+                    .defaultValue(ReloadStrategy.PERIODIC)
+                    .withDescription(
+                            "Defines which strategy to use to reload full 
cache: "
+                                    + "PERIODIC - cache is reloaded with fixed 
intervals without initial delay; "
+                                    + "TIMED - cache is reloaded at specified 
time with fixed intervals multiples of one day.");
+
+    public static final ConfigOption<Duration> 
FULL_CACHE_PERIODIC_RELOAD_INTERVAL =
+            ConfigOptions.key("lookup.full-cache.periodic-reload.interval")
+                    .durationType()
+                    .defaultValue(Duration.ZERO)

Review Comment:
   I'm not sure if it is a good idea to have a default value with 0 here. If 
users intend to reload the cache periodically, the interval should be set in 
the configuration to fully express the reloading strategy. Maybe we could just 
leave it without default value here and have a check in the 
`PeriodicCacheReloadTrigger`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to