PatrickRen commented on code in PR #20382: URL: https://github.com/apache/flink/pull/20382#discussion_r936217253
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/PeriodicCacheReloadTrigger.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.lookup.cache.trigger; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; + +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.PERIODIC; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A trigger that reloads cache entries periodically with specified interval and {@link + * ScheduleMode}. If {@code reloadInterval} is zero, cache loading will happen just one time. + */ +public class PeriodicCacheReloadTrigger implements CacheReloadTrigger { + private static final long serialVersionUID = 3828732577291369913L; Review Comment: The serial version UID should start from 1 for new classes ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/PeriodicCacheReloadTrigger.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.lookup.cache.trigger; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; + +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.PERIODIC; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A trigger that reloads cache entries periodically with specified interval and {@link + * ScheduleMode}. If {@code reloadInterval} is zero, cache loading will happen just one time. + */ +public class PeriodicCacheReloadTrigger implements CacheReloadTrigger { + private static final long serialVersionUID = 3828732577291369913L; + + private final Duration reloadInterval; + private final ScheduleMode scheduleMode; + + private transient ScheduledExecutorService scheduledExecutor; + + public PeriodicCacheReloadTrigger(Duration reloadInterval, ScheduleMode scheduleMode) { + checkArgument(!reloadInterval.isNegative(), "Reload interval can't be negative."); + this.reloadInterval = reloadInterval; + this.scheduleMode = scheduleMode; + } + + @VisibleForTesting + PeriodicCacheReloadTrigger( + Duration reloadInterval, + ScheduleMode scheduleMode, + ScheduledExecutorService scheduledExecutor) { + this(reloadInterval, scheduleMode); + this.scheduledExecutor = scheduledExecutor; + } + + @Override + public void open(CacheReloadTrigger.Context context) { + if (scheduledExecutor == null) { + scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + } + if (reloadInterval.isZero()) { + scheduledExecutor.execute(context::triggerReload); + return; + } + switch (scheduleMode) { + case FIXED_RATE: + scheduledExecutor.scheduleAtFixedRate( + context::triggerReload, + 0, + reloadInterval.toMillis(), + TimeUnit.MILLISECONDS); + break; + case FIXED_DELAY: + scheduledExecutor.scheduleWithFixedDelay( + () -> { + try { + context.triggerReload().get(); + } catch (Exception e) { + throw new RuntimeException( + "Uncaught exception during the reload", e); + } + }, + 0, + reloadInterval.toMillis(), + TimeUnit.MILLISECONDS); + break; + default: + throw new IllegalArgumentException( + String.format("Unrecognized schedule mode \"%s\"", scheduleMode)); + } + } + + @Override + public void close() throws Exception { + scheduledExecutor.shutdownNow(); Review Comment: What about wrapping with a `if(scheduledExecutor != null)`? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/TimedCacheReloadTrigger.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.lookup.cache.trigger; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; + +import java.time.Clock; +import java.time.Duration; +import java.time.LocalTime; +import java.time.OffsetTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.time.temporal.Temporal; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.TIMED; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A trigger that reloads at a specific time and repeats for the given interval in days. */ +public class TimedCacheReloadTrigger implements CacheReloadTrigger { + private static final long serialVersionUID = -8565574480311775185L; Review Comment: The serial version UID should start from 1 for new classes ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/cache/trigger/TimedCacheReloadTrigger.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.lookup.cache.trigger; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; + +import java.time.Clock; +import java.time.Duration; +import java.time.LocalTime; +import java.time.OffsetTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.time.temporal.Temporal; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_RELOAD_STRATEGY; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.LookupCacheType.FULL; +import static org.apache.flink.table.connector.source.lookup.LookupOptions.ReloadStrategy.TIMED; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A trigger that reloads at a specific time and repeats for the given interval in days. */ +public class TimedCacheReloadTrigger implements CacheReloadTrigger { + private static final long serialVersionUID = -8565574480311775185L; + + private final Temporal reloadTime; + private final int reloadIntervalInDays; + + private transient ScheduledExecutorService scheduledExecutor; + private transient Clock clock; // clock for testing purposes + + public TimedCacheReloadTrigger(OffsetTime reloadTime, int reloadIntervalInDays) { + this((Temporal) reloadTime, reloadIntervalInDays); + } + + public TimedCacheReloadTrigger(LocalTime reloadTime, int reloadIntervalInDays) { + this((Temporal) reloadTime, reloadIntervalInDays); + } + + private TimedCacheReloadTrigger(Temporal reloadTime, int reloadIntervalInDays) { + checkArgument( + reloadIntervalInDays > 0, + "Reload interval for Timed cache reload trigger must be at least 1 day."); + this.reloadTime = reloadTime; + this.reloadIntervalInDays = reloadIntervalInDays; + } + + @VisibleForTesting + TimedCacheReloadTrigger( + Temporal reloadTime, + int reloadIntervalInDays, + ScheduledExecutorService scheduledExecutor, + Clock clock) { + this(reloadTime, reloadIntervalInDays); + this.scheduledExecutor = scheduledExecutor; + this.clock = clock; + } + + @Override + public void open(Context context) { + if (scheduledExecutor == null) { + scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + } + if (clock == null) { + clock = + reloadTime instanceof LocalTime + ? Clock.systemDefaultZone() + : Clock.system(((OffsetTime) reloadTime).getOffset()); + } + Temporal now = + reloadTime instanceof LocalTime ? LocalTime.now(clock) : OffsetTime.now(clock); + + Duration initialDelayDuration = Duration.between(now, reloadTime); + if (initialDelayDuration.isNegative()) { + // in case when reloadTime is less than current time, reload will happen next day + initialDelayDuration = initialDelayDuration.plus(1, ChronoUnit.DAYS); + } + scheduledExecutor.execute(context::triggerReload); // trigger first load operation + scheduledExecutor.scheduleAtFixedRate( + context::triggerReload, + initialDelayDuration.toMillis(), + Duration.ofDays(reloadIntervalInDays).toMillis(), + TimeUnit.MILLISECONDS); + } + + @Override + public void close() throws Exception { + scheduledExecutor.shutdownNow(); Review Comment: What about wrapping with a if(scheduledExecutor != null)? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/lookup/LookupOptions.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source.lookup; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger; +import org.apache.flink.table.connector.source.lookup.cache.trigger.PeriodicCacheReloadTrigger; + +import java.time.Duration; + +/** Predefined options for lookup join. */ +public class LookupOptions { + public static final ConfigOption<LookupCacheType> CACHE_TYPE = + ConfigOptions.key("lookup.cache") + .enumType(LookupCacheType.class) + .defaultValue(LookupCacheType.NONE) + .withDescription("The caching strategy for this lookup table"); + + public static final ConfigOption<ReloadStrategy> FULL_CACHE_RELOAD_STRATEGY = + ConfigOptions.key("lookup.full-cache.reload-strategy") + .enumType(ReloadStrategy.class) + .defaultValue(ReloadStrategy.PERIODIC) + .withDescription( + "Defines which strategy to use to reload full cache: " + + "PERIODIC - cache is reloaded with fixed intervals without initial delay; " + + "TIMED - cache is reloaded at specified time with fixed intervals multiples of one day."); + + public static final ConfigOption<Duration> FULL_CACHE_PERIODIC_RELOAD_INTERVAL = + ConfigOptions.key("lookup.full-cache.periodic-reload.interval") + .durationType() + .defaultValue(Duration.ZERO) Review Comment: I'm not sure if it is a good idea to have a default value with 0 here. If users intend to reload the cache periodically, the interval should be set in the configuration to fully express the reloading strategy. Maybe we could just leave it without default value here and have a check in the `PeriodicCacheReloadTrigger`. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
