This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6266a12c2 [GOBBLIN-1927] Add topic validation support in KafkaSource,
and add TopicNameValidator (#3793)
6266a12c2 is described below
commit 6266a12c29c464f64d69cbed5c0fce9b0121870e
Author: Tao Qin <[email protected]>
AuthorDate: Wed Oct 18 14:46:45 2023 -0700
[GOBBLIN-1927] Add topic validation support in KafkaSource, and add
TopicNameValidator (#3793)
* * Add generic topic validation support
* Add the first validator TopicNameValidator into the validator chain, as a
refactor of existing codes
* Refine to address comments
* Refine
---------
Co-authored-by: Tao Qin <[email protected]>
---
.../extractor/extract/kafka/KafkaSource.java | 11 +-
.../kafka/validator/TopicNameValidator.java | 42 +++++++
.../kafka/validator/TopicValidatorBase.java | 33 ++++++
.../extract/kafka/validator/TopicValidators.java | 131 +++++++++++++++++++++
.../extractor/extract/kafka/KafkaSourceTest.java | 33 ++++++
.../kafka/validator/TopicValidatorsTest.java | 112 ++++++++++++++++++
.../org/apache/gobblin/util/ExecutorsUtils.java | 15 +++
7 files changed, 376 insertions(+), 1 deletion(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 0bc4c948e..0bca916a7 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -61,6 +61,7 @@ import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.EventBasedSource;
import
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
+import
org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators;
import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -218,7 +219,7 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));
- List<KafkaTopic> topics = getFilteredTopics(state);
+ List<KafkaTopic> topics = getValidTopics(getFilteredTopics(state),
state);
this.topicsToProcess =
topics.stream().map(KafkaTopic::getName).collect(toSet());
for (String topic : this.topicsToProcess) {
@@ -802,6 +803,7 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
protected List<KafkaTopic> getFilteredTopics(SourceState state) {
List<Pattern> blacklist = DatasetFilterUtils.getPatternList(state,
TOPIC_BLACKLIST);
List<Pattern> whitelist = DatasetFilterUtils.getPatternList(state,
TOPIC_WHITELIST);
+ // TODO: replace this with TopicNameValidator in the config once
TopicValidators is rolled out.
if (!state.getPropAsBoolean(KafkaSource.ALLOW_PERIOD_IN_TOPIC_NAME, true))
{
blacklist.add(Pattern.compile(".*\\..*"));
}
@@ -815,6 +817,13 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
state.setProp(ConfigurationKeys.FAIL_TO_GET_OFFSET_COUNT,
this.failToGetOffsetCount);
}
+ /**
+ * Return topics that pass all the topic validators.
+ */
+ protected List<KafkaTopic> getValidTopics(List<KafkaTopic> topics,
SourceState state) {
+ return new TopicValidators(state).validate(topics);
+ }
+
/**
* This class contains startOffset, earliestOffset and latestOffset for a
Kafka partition.
*/
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java
new file mode 100644
index 000000000..c8dd8223f
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.validator;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+
+/**
+ * A topic validator that validates the topic name
+ */
+public class TopicNameValidator extends TopicValidatorBase {
+ private static final String DOT = ".";
+
+ public TopicNameValidator(State state) {
+ super(state);
+ }
+
+ /**
+ * Check if a topic name is valid, current rules are:
+ * 1. must not contain "."
+ * @param topic the topic to be validated
+ * @return true if the topic name is valid (aka. doesn't contain ".")
+ */
+ @Override
+ public boolean validate(KafkaTopic topic) throws Exception {
+ return !topic.getName().contains(DOT);
+ }
+}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java
new file mode 100644
index 000000000..69c5bc92a
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.validator;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+
+/**
+ * The base class of a topic validator
+ */
+public abstract class TopicValidatorBase {
+ protected State state;
+
+ public TopicValidatorBase(State sourceState) {
+ this.state = sourceState;
+ }
+
+ public abstract boolean validate(KafkaTopic topic) throws Exception;
+}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java
new file mode 100644
index 000000000..fbed07c76
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.validator;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * The TopicValidators contains a list of {@link TopicValidatorBase} that
validate topics.
+ * To enable it, add below settings in the config:
+ *
gobblin.kafka.topicValidators=validator1_class_name,validator2_class_name...
+ */
+@Slf4j
+public class TopicValidators {
+ public static final String VALIDATOR_CLASSES_KEY =
"gobblin.kafka.topicValidators";
+
+ private static long DEFAULTL_TIMEOUT = 10L;
+
+ private static TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.MINUTES;
+
+ private final List<TopicValidatorBase> validators = new ArrayList<>();
+
+ private final State state;
+
+ public TopicValidators(State state) {
+ this.state = state;
+ for (String validatorClassName :
state.getPropAsList(VALIDATOR_CLASSES_KEY, StringUtils.EMPTY)) {
+ try {
+
this.validators.add(GobblinConstructorUtils.invokeConstructor(TopicValidatorBase.class,
validatorClassName,
+ state));
+ } catch (Exception e) {
+ log.error("Failed to create topic validator: {}, due to {}",
validatorClassName, e);
+ }
+ }
+ }
+
+ /**
+ * Validate topics with all the internal validators. The default timeout is
set to 1 hour.
+ * Note:
+ * 1. the validations for every topic run in parallel.
+ * 2. when timeout happens, un-validated topics are still treated as
"valid".
+ * @param topics the topics to be validated
+ * @return the topics that pass all the validators
+ */
+ public List<KafkaTopic> validate(List<KafkaTopic> topics) {
+ return validate(topics, DEFAULTL_TIMEOUT, DEFAULT_TIMEOUT_UNIT);
+ }
+
+ /**
+ * Validate topics with all the internal validators.
+ * Note:
+ * 1. the validations for every topic run in parallel.
+ * 2. when timeout happens, un-validated topics are still treated as
"valid".
+ * @param topics the topics to be validated
+ * @param timeout the timeout for the validation
+ * @param timeoutUnit the time unit for the timeout
+ * @return the topics that pass all the validators
+ */
+ public List<KafkaTopic> validate(List<KafkaTopic> topics, long timeout,
TimeUnit timeoutUnit) {
+ int numOfThreads =
state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
+
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
+
+ // Tasks running in the thread pool will have the same access control and
class loader settings as current thread
+ ExecutorService threadPool = Executors.newFixedThreadPool(numOfThreads,
ExecutorsUtils.newPrivilegedThreadFactory(
+ Optional.of(log)));
+
+ List<Future<Boolean>> results = new ArrayList<>();
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ for (KafkaTopic topic : topics) {
+ results.add(threadPool.submit(() -> validate(topic)));
+ }
+ ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(log),
timeout, timeoutUnit);
+ log.info(String.format("Validate %d topics in %d seconds", topics.size(),
stopwatch.elapsed(TimeUnit.SECONDS)));
+
+ List<KafkaTopic> validTopics = new ArrayList<>();
+ for (int i = 0; i < results.size(); ++i) {
+ try {
+ if (results.get(i).get()) {
+ validTopics.add(topics.get(i));
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Failed to validate topic: {}, treat it as a valid topic",
topics.get(i));
+ validTopics.add(topics.get(i));
+ }
+ }
+ return validTopics;
+ }
+
+ /**
+ * Validates a single topic with all the internal validators
+ */
+ private boolean validate(KafkaTopic topic) throws Exception {
+ log.info("Validating topic {} in thread: {}", topic,
Thread.currentThread().getName());
+ for (TopicValidatorBase validator : this.validators) {
+ if (!validator.validate(topic)) {
+ log.warn("KafkaTopic: {} doesn't pass the validator: {}", topic,
validator.getClass().getName());
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
index 9992d4442..c26872e1c 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
@@ -25,6 +25,9 @@ import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
+import
org.apache.gobblin.source.extractor.extract.kafka.validator.TopicNameValidator;
+import
org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -56,6 +59,36 @@ public class KafkaSourceTest {
Assert.assertEquals(new
TestKafkaSource(testKafkaClient).getFilteredTopics(state),
toKafkaTopicList(allTopics.subList(0, 3)));
}
+ @Test
+ public void testTopicValidators() {
+ TestKafkaClient testKafkaClient = new TestKafkaClient();
+ List<String> allTopics = Arrays.asList(
+ "Topic1", "topic-v2", "topic3", // allowed
+ "topic-with.period-in_middle", ".topic-with-period-at-start",
"topicWithPeriodAtEnd.", //period topics
+ "not-allowed-topic");
+ testKafkaClient.testTopics = allTopics;
+ KafkaSource kafkaSource = new TestKafkaSource(testKafkaClient);
+
+ SourceState state = new SourceState();
+ state.setProp(KafkaSource.TOPIC_WHITELIST, ".*[Tt]opic.*");
+ state.setProp(KafkaSource.TOPIC_BLACKLIST, "not-allowed.*");
+ List<KafkaTopic> topicsToValidate = kafkaSource.getFilteredTopics(state);
+
+ // Test without TopicValidators in the state
+
Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate,
state),
+ toKafkaTopicList(allTopics.subList(0, 6))));
+
+ // Test empty TopicValidators in the state
+ state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, "");
+
Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate,
state),
+ toKafkaTopicList(allTopics.subList(0, 6))));
+
+ // Test TopicValidators with TopicNameValidator in the state
+ state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY,
TopicNameValidator.class.getName());
+
Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate,
state),
+ toKafkaTopicList(allTopics.subList(0, 3))));
+ }
+
public List<KafkaTopic> toKafkaTopicList(List<String> topicNames) {
return topicNames.stream().map(topicName -> new KafkaTopic(topicName,
Collections.emptyList())).collect(Collectors.toList());
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java
new file mode 100644
index 000000000..2691ae112
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.validator;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TopicValidatorsTest {
+ @Test
+ public void testTopicValidators() {
+ List<String> allTopics = Arrays.asList(
+ "topic1", "topic2", // allowed
+ "topic-with.period-in_middle", ".topic-with-period-at-start",
"topicWithPeriodAtEnd.", // bad topics
+ "topic3", "topic4"); // in deny list
+ List<KafkaTopic> topics = buildKafkaTopics(allTopics);
+
+ State state = new State();
+
+ // Without any topic validators
+ List<KafkaTopic> validTopics = new TopicValidators(state).validate(topics);
+ Assert.assertEquals(validTopics.size(), 7);
+
+ // Use 2 topic validators: TopicNameValidator and DenyListValidator
+ String validatorsToUse = String.join(",", ImmutableList.of(
+ TopicNameValidator.class.getName(),
DenyListValidator.class.getName()));
+ state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, validatorsToUse);
+ validTopics = new TopicValidators(state).validate(topics);
+
+ Assert.assertEquals(validTopics.size(), 2);
+ Assert.assertTrue(validTopics.stream().anyMatch(topic ->
topic.getName().equals("topic1")));
+ Assert.assertTrue(validTopics.stream().anyMatch(topic ->
topic.getName().equals("topic2")));
+ }
+
+ @Test
+ public void testValidatorTimeout() {
+ List<String> allTopics = Arrays.asList("topic1", "topic2", "topic3");
+ List<KafkaTopic> topics = buildKafkaTopics(allTopics);
+ State state = new State();
+ state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY,
RejectEverythingValidator.class.getName());
+ List<KafkaTopic> validTopics = new TopicValidators(state).validate(topics,
5, TimeUnit.SECONDS);
+ Assert.assertEquals(validTopics.size(), 1); // topic 2 times out, it
should be treated as a valid topic
+ Assert.assertEquals(validTopics.get(0).getName(), "topic2");
+ }
+
+ private List<KafkaTopic> buildKafkaTopics(List<String> topics) {
+ return topics.stream()
+ .map(topicName -> new KafkaTopic(topicName, Collections.emptyList()))
+ .collect(Collectors.toList());
+ }
+
+ // A TopicValidator class to mimic a deny list
+ public static class DenyListValidator extends TopicValidatorBase {
+ Set<String> denyList = ImmutableSet.of("topic3", "topic4");
+
+ public DenyListValidator(State state) {
+ super(state);
+ }
+
+ @Override
+ public boolean validate(KafkaTopic topic) {
+ return !this.denyList.contains(topic.getName());
+ }
+ }
+
+ // A validator that always returns false when validate() is called.
+ // Sleep for 5 sec when processing topic2 to simulate a slow validation.
+ public static class RejectEverythingValidator extends TopicValidatorBase {
+
+ public RejectEverythingValidator(State state) {
+ super(state);
+ }
+
+ @Override
+ public boolean validate(KafkaTopic topic) {
+ if (!topic.getName().equals("topic2")) {
+ return false;
+ }
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return false;
+ }
+ }
+}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java
index b05674a95..09f2f00b4 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java
@@ -103,6 +103,21 @@ public class ExecutorsUtils {
return newThreadFactory(new ThreadFactoryBuilder().setDaemon(true),
logger, nameFormat);
}
+ /**
+ * Get a new {@link ThreadFactory} that uses a {@link
LoggingUncaughtExceptionHandler}
+ * to handle uncaught exceptions.
+ * Tasks running within such threads will have the same access control and
class loader settings as the
+ * thread that invokes this method.
+ *
+ * @param logger an {@link Optional} wrapping the {@link Logger} that the
+ * {@link LoggingUncaughtExceptionHandler} uses to log
uncaught exceptions thrown in threads
+ * @return a new {@link ThreadFactory}
+ */
+ public static ThreadFactory newPrivilegedThreadFactory(Optional<Logger>
logger) {
+ return newThreadFactory(new
ThreadFactoryBuilder().setThreadFactory(Executors.privilegedThreadFactory()),
logger,
+ Optional.<String>absent());
+ }
+
private static ThreadFactory newThreadFactory(ThreadFactoryBuilder builder,
Optional<Logger> logger,
Optional<String> nameFormat) {
if (nameFormat.isPresent()) {