This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6266a12c2 [GOBBLIN-1927] Add topic validation support in KafkaSource, 
and add TopicNameValidator (#3793)
6266a12c2 is described below

commit 6266a12c29c464f64d69cbed5c0fce9b0121870e
Author: Tao Qin <[email protected]>
AuthorDate: Wed Oct 18 14:46:45 2023 -0700

    [GOBBLIN-1927] Add topic validation support in KafkaSource, and add 
TopicNameValidator (#3793)
    
    * * Add generic topic validation support
    * Add the first validator TopicNameValidator into the validator chain, as a 
refactor of existing codes
    
    * Refine to address comments
    
    * Refine
    
    ---------
    
    Co-authored-by: Tao Qin <[email protected]>
---
 .../extractor/extract/kafka/KafkaSource.java       |  11 +-
 .../kafka/validator/TopicNameValidator.java        |  42 +++++++
 .../kafka/validator/TopicValidatorBase.java        |  33 ++++++
 .../extract/kafka/validator/TopicValidators.java   | 131 +++++++++++++++++++++
 .../extractor/extract/kafka/KafkaSourceTest.java   |  33 ++++++
 .../kafka/validator/TopicValidatorsTest.java       | 112 ++++++++++++++++++
 .../org/apache/gobblin/util/ExecutorsUtils.java    |  15 +++
 7 files changed, 376 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 0bc4c948e..0bca916a7 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -61,6 +61,7 @@ import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.extract.EventBasedSource;
 import 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
+import 
org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators;
 import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
 import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -218,7 +219,7 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
 
       this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));
 
-      List<KafkaTopic> topics = getFilteredTopics(state);
+      List<KafkaTopic> topics = getValidTopics(getFilteredTopics(state), 
state);
       this.topicsToProcess = 
topics.stream().map(KafkaTopic::getName).collect(toSet());
 
       for (String topic : this.topicsToProcess) {
@@ -802,6 +803,7 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
   protected List<KafkaTopic> getFilteredTopics(SourceState state) {
     List<Pattern> blacklist = DatasetFilterUtils.getPatternList(state, 
TOPIC_BLACKLIST);
     List<Pattern> whitelist = DatasetFilterUtils.getPatternList(state, 
TOPIC_WHITELIST);
+    // TODO: replace this with TopicNameValidator in the config once 
TopicValidators is rolled out.
     if (!state.getPropAsBoolean(KafkaSource.ALLOW_PERIOD_IN_TOPIC_NAME, true)) 
{
       blacklist.add(Pattern.compile(".*\\..*"));
     }
@@ -815,6 +817,13 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
     state.setProp(ConfigurationKeys.FAIL_TO_GET_OFFSET_COUNT, 
this.failToGetOffsetCount);
   }
 
+  /**
+   * Return topics that pass all the topic validators.
+   */
+  protected List<KafkaTopic> getValidTopics(List<KafkaTopic> topics, 
SourceState state) {
+    return new TopicValidators(state).validate(topics);
+  }
+
   /**
    * This class contains startOffset, earliestOffset and latestOffset for a 
Kafka partition.
    */
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java
new file mode 100644
index 000000000..c8dd8223f
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.validator;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+
+/**
+ * A topic validator that validates the topic name
+ */
+public class TopicNameValidator extends TopicValidatorBase {
+  private static final String DOT = ".";
+
+  public TopicNameValidator(State state) {
+    super(state);
+  }
+
+  /**
+   * Check if a topic name is valid, current rules are:
+   *  1. must not contain "."
+   * @param topic the topic to be validated
+   * @return true if the topic name is valid (aka. doesn't contain ".")
+   */
+  @Override
+  public boolean validate(KafkaTopic topic) throws Exception {
+    return !topic.getName().contains(DOT);
+  }
+}
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java
new file mode 100644
index 000000000..69c5bc92a
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.validator;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+
+/**
+ * The base class of a topic validator
+ */
+public abstract class TopicValidatorBase {
+  protected State state;
+
+  public TopicValidatorBase(State sourceState) {
+    this.state = sourceState;
+  }
+
+  public abstract boolean validate(KafkaTopic topic) throws Exception;
+}
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java
new file mode 100644
index 000000000..fbed07c76
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.validator;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * The TopicValidators contains a list of {@link TopicValidatorBase} that 
validate topics.
+ * To enable it, add below settings in the config:
+ *   
gobblin.kafka.topicValidators=validator1_class_name,validator2_class_name...
+ */
+@Slf4j
+public class TopicValidators {
+  public static final String VALIDATOR_CLASSES_KEY = 
"gobblin.kafka.topicValidators";
+
+  private static long DEFAULTL_TIMEOUT = 10L;
+
+  private static TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.MINUTES;
+
+  private final List<TopicValidatorBase> validators = new ArrayList<>();
+
+  private final State state;
+
+  public TopicValidators(State state) {
+    this.state = state;
+    for (String validatorClassName : 
state.getPropAsList(VALIDATOR_CLASSES_KEY, StringUtils.EMPTY)) {
+      try {
+        
this.validators.add(GobblinConstructorUtils.invokeConstructor(TopicValidatorBase.class,
 validatorClassName,
+            state));
+      } catch (Exception e) {
+        log.error("Failed to create topic validator: {}, due to {}", 
validatorClassName, e);
+      }
+    }
+  }
+
+  /**
+   * Validate topics with all the internal validators. The default timeout is 
set to 1 hour.
+   * Note:
+   *   1. the validations for every topic run in parallel.
+   *   2. when timeout happens, un-validated topics are still treated as 
"valid".
+   * @param topics the topics to be validated
+   * @return the topics that pass all the validators
+   */
+  public List<KafkaTopic> validate(List<KafkaTopic> topics) {
+    return validate(topics, DEFAULTL_TIMEOUT, DEFAULT_TIMEOUT_UNIT);
+  }
+
+  /**
+   * Validate topics with all the internal validators.
+   * Note:
+   *   1. the validations for every topic run in parallel.
+   *   2. when timeout happens, un-validated topics are still treated as 
"valid".
+   * @param topics the topics to be validated
+   * @param timeout the timeout for the validation
+   * @param timeoutUnit the time unit for the timeout
+   * @return the topics that pass all the validators
+   */
+  public List<KafkaTopic> validate(List<KafkaTopic> topics, long timeout, 
TimeUnit timeoutUnit) {
+    int numOfThreads = 
state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
+        
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
+
+    // Tasks running in the thread pool will have the same access control and 
class loader settings as current thread
+    ExecutorService threadPool = Executors.newFixedThreadPool(numOfThreads, 
ExecutorsUtils.newPrivilegedThreadFactory(
+        Optional.of(log)));
+
+    List<Future<Boolean>> results = new ArrayList<>();
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    for (KafkaTopic topic : topics) {
+      results.add(threadPool.submit(() -> validate(topic)));
+    }
+    ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(log), 
timeout, timeoutUnit);
+    log.info(String.format("Validate %d topics in %d seconds", topics.size(), 
stopwatch.elapsed(TimeUnit.SECONDS)));
+
+    List<KafkaTopic> validTopics = new ArrayList<>();
+    for (int i = 0; i < results.size(); ++i) {
+      try {
+        if (results.get(i).get()) {
+          validTopics.add(topics.get(i));
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        log.warn("Failed to validate topic: {}, treat it as a valid topic", 
topics.get(i));
+        validTopics.add(topics.get(i));
+      }
+    }
+    return validTopics;
+  }
+
+  /**
+   * Validates a single topic with all the internal validators
+   */
+  private boolean validate(KafkaTopic topic) throws Exception {
+    log.info("Validating topic {} in thread: {}", topic, 
Thread.currentThread().getName());
+    for (TopicValidatorBase validator : this.validators) {
+      if (!validator.validate(topic)) {
+        log.warn("KafkaTopic: {} doesn't pass the validator: {}", topic, 
validator.getClass().getName());
+        return false;
+      }
+    }
+    return true;
+  }
+}
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
index 9992d4442..c26872e1c 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
@@ -25,6 +25,9 @@ import java.util.List;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import org.apache.commons.collections.CollectionUtils;
+import 
org.apache.gobblin.source.extractor.extract.kafka.validator.TopicNameValidator;
+import 
org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -56,6 +59,36 @@ public class KafkaSourceTest {
     Assert.assertEquals(new 
TestKafkaSource(testKafkaClient).getFilteredTopics(state), 
toKafkaTopicList(allTopics.subList(0, 3)));
   }
 
+  @Test
+  public void testTopicValidators() {
+    TestKafkaClient testKafkaClient = new TestKafkaClient();
+    List<String> allTopics = Arrays.asList(
+        "Topic1", "topic-v2", "topic3", // allowed
+        "topic-with.period-in_middle", ".topic-with-period-at-start", 
"topicWithPeriodAtEnd.", //period topics
+        "not-allowed-topic");
+    testKafkaClient.testTopics = allTopics;
+    KafkaSource kafkaSource = new TestKafkaSource(testKafkaClient);
+
+    SourceState state = new SourceState();
+    state.setProp(KafkaSource.TOPIC_WHITELIST, ".*[Tt]opic.*");
+    state.setProp(KafkaSource.TOPIC_BLACKLIST, "not-allowed.*");
+    List<KafkaTopic> topicsToValidate = kafkaSource.getFilteredTopics(state);
+
+    // Test without TopicValidators in the state
+    
Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate,
 state),
+        toKafkaTopicList(allTopics.subList(0, 6))));
+
+    // Test empty TopicValidators in the state
+    state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, "");
+    
Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate,
 state),
+        toKafkaTopicList(allTopics.subList(0, 6))));
+
+    // Test TopicValidators with TopicNameValidator in the state
+    state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, 
TopicNameValidator.class.getName());
+    
Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate,
 state),
+        toKafkaTopicList(allTopics.subList(0, 3))));
+  }
+
   public List<KafkaTopic> toKafkaTopicList(List<String> topicNames) {
     return topicNames.stream().map(topicName -> new KafkaTopic(topicName, 
Collections.emptyList())).collect(Collectors.toList());
   }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java
new file mode 100644
index 000000000..2691ae112
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka.validator;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TopicValidatorsTest {
+  @Test
+  public void testTopicValidators() {
+    List<String> allTopics = Arrays.asList(
+        "topic1", "topic2", // allowed
+        "topic-with.period-in_middle", ".topic-with-period-at-start", 
"topicWithPeriodAtEnd.", // bad topics
+        "topic3", "topic4"); // in deny list
+    List<KafkaTopic> topics = buildKafkaTopics(allTopics);
+
+    State state = new State();
+
+    // Without any topic validators
+    List<KafkaTopic> validTopics = new TopicValidators(state).validate(topics);
+    Assert.assertEquals(validTopics.size(), 7);
+
+    // Use 2 topic validators: TopicNameValidator and DenyListValidator
+    String validatorsToUse = String.join(",", ImmutableList.of(
+        TopicNameValidator.class.getName(), 
DenyListValidator.class.getName()));
+    state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, validatorsToUse);
+    validTopics = new TopicValidators(state).validate(topics);
+
+    Assert.assertEquals(validTopics.size(), 2);
+    Assert.assertTrue(validTopics.stream().anyMatch(topic -> 
topic.getName().equals("topic1")));
+    Assert.assertTrue(validTopics.stream().anyMatch(topic -> 
topic.getName().equals("topic2")));
+  }
+
+  @Test
+  public void testValidatorTimeout() {
+    List<String> allTopics = Arrays.asList("topic1", "topic2", "topic3");
+    List<KafkaTopic> topics = buildKafkaTopics(allTopics);
+    State state = new State();
+    state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, 
RejectEverythingValidator.class.getName());
+    List<KafkaTopic> validTopics = new TopicValidators(state).validate(topics, 
5, TimeUnit.SECONDS);
+    Assert.assertEquals(validTopics.size(), 1); // topic 2 times out, it 
should be treated as a valid topic
+    Assert.assertEquals(validTopics.get(0).getName(), "topic2");
+  }
+
+  private List<KafkaTopic> buildKafkaTopics(List<String> topics) {
+    return topics.stream()
+        .map(topicName -> new KafkaTopic(topicName, Collections.emptyList()))
+        .collect(Collectors.toList());
+  }
+
+  // A TopicValidator class to mimic a deny list
+  public static class DenyListValidator extends TopicValidatorBase {
+    Set<String> denyList = ImmutableSet.of("topic3", "topic4");
+
+    public DenyListValidator(State state) {
+      super(state);
+    }
+
+    @Override
+    public boolean validate(KafkaTopic topic) {
+      return !this.denyList.contains(topic.getName());
+    }
+  }
+
+  // A validator that always returns false when validate() is called.
+  // Sleep for 5 sec when processing topic2 to simulate a slow validation.
+  public static class RejectEverythingValidator extends TopicValidatorBase {
+
+    public RejectEverythingValidator(State state) {
+      super(state);
+    }
+
+    @Override
+    public boolean validate(KafkaTopic topic) {
+      if (!topic.getName().equals("topic2")) {
+        return false;
+      }
+
+      try {
+        Thread.sleep(10000);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return false;
+    }
+  }
+}
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java
index b05674a95..09f2f00b4 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java
@@ -103,6 +103,21 @@ public class ExecutorsUtils {
     return newThreadFactory(new ThreadFactoryBuilder().setDaemon(true), 
logger, nameFormat);
   }
 
+  /**
+   * Get a new {@link ThreadFactory} that uses a {@link 
LoggingUncaughtExceptionHandler}
+   * to handle uncaught exceptions.
+   * Tasks running within such threads will have the same access control and 
class loader settings as the
+   * thread that invokes this method.
+   *
+   * @param logger an {@link Optional} wrapping the {@link Logger} that the
+   *               {@link LoggingUncaughtExceptionHandler} uses to log 
uncaught exceptions thrown in threads
+   * @return a new {@link ThreadFactory}
+   */
+  public static ThreadFactory newPrivilegedThreadFactory(Optional<Logger> 
logger) {
+    return newThreadFactory(new 
ThreadFactoryBuilder().setThreadFactory(Executors.privilegedThreadFactory()), 
logger,
+        Optional.<String>absent());
+  }
+
   private static ThreadFactory newThreadFactory(ThreadFactoryBuilder builder, 
Optional<Logger> logger,
       Optional<String> nameFormat) {
     if (nameFormat.isPresent()) {

Reply via email to