This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 5cd38ac [GOBBLIN-1162] Provide an option to allow slow containers to commit su… 5cd38ac is described below commit 5cd38ac8d26dd4bccd8582a7182893f29ab5a009 Author: sv2000 <sudarsh...@gmail.com> AuthorDate: Mon Jun 1 11:29:14 2020 -0700 [GOBBLIN-1162] Provide an option to allow slow containers to commit su… Closes #3002 from sv2000/containerSuicide --- .../cluster/ContainerHealthCheckException.java | 36 +++++ .../cluster/GobblinClusterConfigurationKeys.java | 6 + .../apache/gobblin/cluster/GobblinTaskRunner.java | 73 ++++++++- .../cluster/HelixAssignedParticipantCheck.java | 5 +- .../gobblin/cluster/GobblinTaskRunnerTest.java | 54 ++++++- .../reporter/KafkaKeyValueProducerPusherTest.java | 4 +- .../metrics/reporter/KafkaProducerPusherTest.java | 8 +- .../gobblin/runtime/HighLevelConsumerTest.java | 5 +- .../extract/kafka/KafkaExtractorStatsTracker.java | 72 ++++++++- .../extract/kafka/KafkaIngestionHealthCheck.java | 174 +++++++++++++++++++++ .../kafka/KafkaExtractorStatsTrackerTest.java | 32 +++- .../kafka/KafkaIngestionHealthCheckTest.java | 155 ++++++++++++++++++ .../gobblin/service/GobblinServiceManagerTest.java | 19 ++- .../event/ContainerHealthCheckFailureEvent.java | 55 +++++++ .../gobblin/util/eventbus/EventBusFactory.java | 67 ++++++++ .../apache/gobblin/util/eventbus/EventBusKey.java | 42 +++++ .../gobblin/util/eventbus/EventBusFactoryTest.java | 65 ++++++++ .../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 9 +- travis/test-groups.inc | 2 +- 19 files changed, 854 insertions(+), 29 deletions(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthCheckException.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthCheckException.java new file mode 100644 index 0000000..443bde6 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthCheckException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.cluster; + +/** + * Signals that the container has failed one or more health checks. In other words, the container has been detected + * itself to be in an unhealthy state. The application may want to catch this exception to take an appropriate + * action e.g. exiting with an appropriate exit code. + */ +public class ContainerHealthCheckException extends RuntimeException { + public ContainerHealthCheckException() { + super(); + } + + public ContainerHealthCheckException(String message) { + super(message); + } + + public ContainerHealthCheckException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 128a5d6..88585cd 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -177,6 +177,12 @@ public class GobblinClusterConfigurationKeys { public static final String CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = GOBBLIN_CLUSTER_PREFIX + "container.health.metrics.service.enabled" ; public static final boolean DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = false; + //Config to enable/disable container "suicide" on health check failures. To be used in execution modes, where the exiting + // container can be replaced with another container e.g. Gobblin-on-Yarn mode. + public static final String CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = GOBBLIN_CLUSTER_PREFIX + "container.exitOnHealthCheckFailure"; + public static final boolean DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = false; + + //Config to enable/disable reuse of existing Helix Cluster public static final String HELIX_CLUSTER_OVERWRITE_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.overwrite"; public static final boolean DEFAULT_HELIX_CLUSTER_OVERWRITE = true; diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index 103f0cc..68f98a2 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -72,6 +72,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.ServiceManager; @@ -80,15 +81,25 @@ import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; import lombok.Getter; +import lombok.Setter; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.State; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.metrics.RootMetricContext; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.GobblinEventBuilder; +import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.FileUtils; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.JvmUtils; +import org.apache.gobblin.util.TaskEventMetadataUtils; +import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent; +import org.apache.gobblin.util.eventbus.EventBusFactory; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; /** @@ -130,6 +141,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge { private final Optional<ContainerMetrics> containerMetrics; private final List<Service> services = Lists.newArrayList(); private final Path appWorkPath; + //An EventBus instance that can be accessed from any component running within the worker process. The individual components can + // use the EventBus stream to communicate back application level health check results to the + // GobblinTaskRunner. + private final EventBus containerHealthEventBus; @Getter private HelixManager jobHelixManager; @@ -138,11 +153,16 @@ public class GobblinTaskRunner implements StandardMetricsBridge { private TaskStateModelFactory taskStateModelFactory; private boolean isTaskDriver; private boolean dedicatedTaskDriverCluster; + private boolean isContainerExitOnHealthCheckFailureEnabled; + private Collection<StandardMetricsBridge.StandardMetrics> metricsCollection; @Getter private volatile boolean started = false; private volatile boolean stopInProgress = false; private volatile boolean isStopped = false; + @Getter + @Setter + private volatile boolean healthCheckFailed = false; protected final String taskRunnerId; protected final EventBus eventBus = new EventBus(GobblinTaskRunner.class.getSimpleName()); @@ -177,6 +197,23 @@ public class GobblinTaskRunner implements StandardMetricsBridge { // in the application configuration have to be extracted and set before initializing HelixManager. HelixUtils.setSystemProperties(config); + this.isContainerExitOnHealthCheckFailureEnabled = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED, + GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED); + + if (this.isContainerExitOnHealthCheckFailureEnabled) { + EventBus eventBus; + try { + eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME, + SharedResourcesBrokerFactory.getImplicitBroker()); + } catch (IOException e) { + logger.error("Could not find EventBus instance for container health check", e); + eventBus = null; + } + this.containerHealthEventBus = eventBus; + } else { + this.containerHealthEventBus = null; + } + initHelixManager(); this.containerMetrics = buildContainerMetrics(); @@ -276,7 +313,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge { /** * Start this {@link GobblinTaskRunner} instance. */ - public void start() { + public void start() throws ContainerHealthCheckException { logger.info(String.format("Starting %s in container %s", this.helixInstanceName, this.taskRunnerId)); // Add a shutdown hook so the task scheduler gets properly shutdown @@ -314,6 +351,12 @@ public class GobblinTaskRunner implements StandardMetricsBridge { this.taskRunnerId); } + if (this.containerHealthEventBus != null) { + //Register itself with the container health event bus instance to receive container health events + logger.info("Registering GobblinTaskRunner with ContainerHealthCheckEventBus.."); + this.containerHealthEventBus.register(this); + } + if (this.serviceManager != null) { this.serviceManager.startAsync(); started = true; @@ -321,6 +364,13 @@ public class GobblinTaskRunner implements StandardMetricsBridge { } else { started = true; } + + //Check if the TaskRunner shutdown is invoked due to a health check failure. If yes, throw a RuntimeException + // that will be propagated to the caller. + if (this.isContainerExitOnHealthCheckFailureEnabled && GobblinTaskRunner.this.isHealthCheckFailed()) { + logger.error("GobblinTaskRunner finished due to health check failure."); + throw new ContainerHealthCheckException(); + } } public synchronized void stop() { @@ -670,6 +720,27 @@ public class GobblinTaskRunner implements StandardMetricsBridge { } } + @Subscribe + public void handleContainerHealthCheckFailureEvent(ContainerHealthCheckFailureEvent event) { + logger.error("Received {} from: {}", event.getClass().getSimpleName(), event.getClassName()); + logger.error("Submitting a ContainerHealthCheckFailureEvent.."); + submitEvent(event); + logger.error("Stopping GobblinTaskRunner..."); + GobblinTaskRunner.this.setHealthCheckFailed(true); + GobblinTaskRunner.this.stop(); + } + + private void submitEvent(ContainerHealthCheckFailureEvent event) { + EventSubmitter eventSubmitter = new EventSubmitter.Builder(RootMetricContext.get(), getClass().getPackage().getName()).build(); + GobblinEventBuilder eventBuilder = new GobblinEventBuilder(event.getClass().getSimpleName()); + State taskState = ConfigUtils.configToState(event.getConfig()); + //Add task metadata such as Helix taskId, containerId, and workflowId if configured + TaskEventMetadataGenerator taskEventMetadataGenerator = TaskEventMetadataUtils.getTaskEventMetadataGenerator(taskState); + eventBuilder.addAdditionalMetadata(taskEventMetadataGenerator.getMetadata(taskState, event.getClass().getSimpleName())); + eventBuilder.addAdditionalMetadata(event.getMetadata()); + eventSubmitter.submit(eventBuilder); + } + private static String getApplicationId() { return "1"; } diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java index 2102365..9aa1eeb 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java @@ -66,8 +66,6 @@ public class HelixAssignedParticipantCheck implements CommitStep { private final int partitionNum; private final Config config; - private boolean isCompleted; - /** * A method that uses the Singleton pattern to instantiate a {@link HelixManager} instance. * @param config @@ -114,7 +112,7 @@ public class HelixAssignedParticipantCheck implements CommitStep { */ @Override public boolean isCompleted() { - return isCompleted; + return false; } /** @@ -157,7 +155,6 @@ public class HelixAssignedParticipantCheck implements CommitStep { isParticipant = true; } - this.isCompleted = true; if (!isParticipant) { throw new CommitStepException(String.format("Helix instance %s not the assigned participant for partition %d",this.helixInstanceName, this.partitionNum)); } diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java index 98d13df..7a0b879 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java @@ -19,8 +19,10 @@ package org.apache.gobblin.cluster; import java.io.IOException; import java.net.URL; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.curator.test.TestingServer; import org.apache.hadoop.fs.FileSystem; @@ -38,12 +40,16 @@ import org.testng.annotations.Test; import com.google.common.base.Optional; import com.google.common.base.Predicate; +import com.google.common.eventbus.EventBus; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.cluster.suite.IntegrationBasicSuite; import org.apache.gobblin.testing.AssertWithBackoff; +import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent; +import org.apache.gobblin.util.eventbus.EventBusFactory; /** @@ -68,9 +74,10 @@ public class GobblinTaskRunnerTest { private TestingServer testingZKServer; private GobblinTaskRunner gobblinTaskRunner; + private GobblinTaskRunner gobblinTaskRunnerHealthCheck; + private GobblinTaskRunner corruptGobblinTaskRunner; private GobblinClusterManager gobblinClusterManager; - private GobblinTaskRunner corruptGobblinTaskRunner; private String clusterName; private String corruptHelixInstance; private TaskAssignmentAfterConnectionRetry suite; @@ -101,7 +108,14 @@ public class GobblinTaskRunnerTest { this.gobblinTaskRunner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_HELIX_INSTANCE_NAME, TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.<Path>absent()); - this.gobblinTaskRunner.connectHelixManager(); + + // Participant + String healthCheckInstance = HelixUtils.getHelixInstanceName("HealthCheckHelixInstance", 0); + this.gobblinTaskRunnerHealthCheck = + new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, healthCheckInstance, + TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, + config.withValue(GobblinClusterConfigurationKeys.CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED, ConfigValueFactory.fromAnyRef(true)) + , Optional.<Path>absent()); // Participant with a partial Instance set up on Helix/ZK this.corruptHelixInstance = HelixUtils.getHelixInstanceName("CorruptHelixInstance", 0); @@ -118,6 +132,8 @@ public class GobblinTaskRunnerTest { @Test public void testSendReceiveShutdownMessage() throws Exception { + this.gobblinTaskRunner.connectHelixManager(); + ExecutorService service = Executors.newSingleThreadExecutor(); service.submit(() -> GobblinTaskRunnerTest.this.gobblinTaskRunner.start()); @@ -148,7 +164,6 @@ public class GobblinTaskRunnerTest { Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value"); } - @Test public void testConnectHelixManagerWithRetry() { HelixManager instanceManager = HelixManagerFactory.getZKHelixManager( @@ -196,6 +211,38 @@ public class GobblinTaskRunnerTest { helixManager.disconnect(); } + @Test (groups = {"disabledOnTravis"}, dependsOnMethods = "testSendReceiveShutdownMessage", expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*ContainerHealthCheckException.*") + public void testShutdownOnHealthCheckFailure() throws Exception { + this.gobblinTaskRunnerHealthCheck.connectHelixManager(); + + ExecutorService service = Executors.newSingleThreadExecutor(); + Future future = service.submit(() -> GobblinTaskRunnerTest.this.gobblinTaskRunnerHealthCheck.start()); + + Logger log = LoggerFactory.getLogger("testHandleContainerHealthCheckFailure"); + + // Give Helix some time to start the task runner + AssertWithBackoff.create().logger(log).timeoutMs(20000) + .assertTrue(new Predicate<Void>() { + @Override public boolean apply(Void input) { + return GobblinTaskRunnerTest.this.gobblinTaskRunnerHealthCheck.isStarted(); + } + }, "gobblinTaskRunner started"); + + EventBus eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME, + SharedResourcesBrokerFactory.getImplicitBroker()); + eventBus.post(new ContainerHealthCheckFailureEvent(ConfigFactory.empty(), getClass().getName())); + + // Give some time to allow GobblinTaskRunner to handle the ContainerHealthCheckFailureEvent + AssertWithBackoff.create().logger(log).timeoutMs(30000) + .assertTrue(new Predicate<Void>() { + @Override public boolean apply(Void input) { + return GobblinTaskRunnerTest.this.gobblinTaskRunnerHealthCheck.isStopped(); + } + }, "gobblinTaskRunner stopped"); + + //Call Future#get() to check and ensure that ContainerHealthCheckException is thrown + future.get(); + } public static class TaskAssignmentAfterConnectionRetry extends IntegrationBasicSuite { TaskAssignmentAfterConnectionRetry(Config jobConfigOverrides) { @@ -223,6 +270,7 @@ public class GobblinTaskRunnerTest { this.gobblinClusterManager.disconnectHelixManager(); this.gobblinTaskRunner.disconnectHelixManager(); this.corruptGobblinTaskRunner.disconnectHelixManager(); + this.gobblinTaskRunnerHealthCheck.disconnectHelixManager(); if (this.suite != null) { this.suite.shutdownCluster(); } diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java index 1681d7b..f0914b6 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaKeyValueProducerPusherTest.java @@ -58,9 +58,9 @@ public class KafkaKeyValueProducerPusherTest { @Test public void test() throws IOException { // Test that the scoped config overrides the generic config - Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("localhost:dummy", TOPIC, + Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("127.0.0.1:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort())))); + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + this.kafkaTestHelper.getKafkaServerPort())))); String msg1 = "msg1"; String msg2 = "msg2"; diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java index 723f8b7..5531213 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/metrics/reporter/KafkaProducerPusherTest.java @@ -30,12 +30,12 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.typesafe.config.ConfigFactory; +import kafka.consumer.ConsumerIterator; + import org.apache.gobblin.kafka.KafkaTestBase; import org.apache.gobblin.metrics.kafka.KafkaProducerPusher; import org.apache.gobblin.metrics.kafka.Pusher; -import kafka.consumer.ConsumerIterator; - /** * Test {@link org.apache.gobblin.metrics.kafka.KafkaProducerPusher}. @@ -56,8 +56,8 @@ public class KafkaProducerPusherTest { @Test public void test() throws IOException { // Test that the scoped config overrides the generic config - Pusher pusher = new KafkaProducerPusher("localhost:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort())))); + Pusher pusher = new KafkaProducerPusher("127.0.0.1:dummy", TOPIC, Optional.of(ConfigFactory.parseMap(ImmutableMap.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + this.kafkaTestHelper.getKafkaServerPort())))); String msg1 = "msg1"; String msg2 = "msg2"; diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java index c101d15..a54a75c 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java @@ -44,6 +44,7 @@ import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys; import org.apache.gobblin.runtime.kafka.HighLevelConsumer; import org.apache.gobblin.runtime.kafka.MockedHighLevelConsumer; import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition; +import org.apache.gobblin.test.TestUtils; import org.apache.gobblin.testing.AssertWithBackoff; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.writer.AsyncDataWriter; @@ -67,7 +68,7 @@ public class HighLevelConsumerTest extends KafkaTestBase { public HighLevelConsumerTest() throws InterruptedException, RuntimeException { super(); - _kafkaBrokers = "localhost:" + this.getKafkaServerPort(); + _kafkaBrokers = "127.0.0.1:" + this.getKafkaServerPort(); } @BeforeSuite @@ -92,7 +93,7 @@ public class HighLevelConsumerTest extends KafkaTestBase { public static Config getSimpleConfig(Optional<String> prefix) { Properties properties = new Properties(); - properties.put(getConfigKey(prefix, ConfigurationKeys.KAFKA_BROKERS), "localhost:1234"); + properties.put(getConfigKey(prefix, ConfigurationKeys.KAFKA_BROKERS), "127.0.0.1:" + TestUtils.findFreePort()); properties.put(getConfigKey(prefix, Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY), Kafka09ConsumerClient.KAFKA_09_DEFAULT_KEY_DESERIALIZER); properties.put(getConfigKey(prefix, "zookeeper.connect"), "zookeeper"); properties.put(ConfigurationKeys.STATE_STORE_ENABLED, "true"); diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java index 103bfe0..77cca0c 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java @@ -32,6 +32,7 @@ import com.google.common.base.Charsets; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import lombok.AccessLevel; import lombok.Data; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -94,6 +95,13 @@ public class KafkaExtractorStatsTracker { private List<KafkaPartition> partitions; private long maxPossibleLatency; + //Extractor stats aggregated across all partitions processed by the extractor. + @Getter (AccessLevel.PACKAGE) + @VisibleForTesting + private AggregateExtractorStats aggregateExtractorStats = new AggregateExtractorStats(); + //Aggregate stats for the extractor derived from the most recently completed epoch + private AggregateExtractorStats lastAggregateExtractorStats; + public KafkaExtractorStatsTracker(WorkUnitState state, List<KafkaPartition> partitions) { this.workUnitState = state; this.partitions = partitions; @@ -144,7 +152,7 @@ public class KafkaExtractorStatsTracker { } /** - * A Java POJO that encapsulates various extractor stats. + * A Java POJO that encapsulates per-partition extractor stats. */ @Data public static class ExtractorStats { @@ -166,6 +174,21 @@ public class KafkaExtractorStatsTracker { } /** + * A Java POJO to track the aggregate extractor stats across all partitions processed by the extractor. + */ + @Data + public static class AggregateExtractorStats { + private long maxIngestionLatency; + private long numBytesConsumed; + private long minStartFetchEpochTime = Long.MAX_VALUE; + private long maxStopFetchEpochTime; + private long minLogAppendTime = Long.MAX_VALUE; + private long maxLogAppendTime; + private long slaMissedRecordCount; + private long processedRecordCount; + } + + /** * * @param partitionIdx index of Kafka topic partition. * @return the number of undecodeable records for a given partition id. @@ -292,6 +315,31 @@ public class KafkaExtractorStatsTracker { return v; }); onPartitionReadComplete(partitionIdx, readStartTime); + updateAggregateExtractorStats(partitionIdx); + } + + private void updateAggregateExtractorStats(int partitionIdx) { + ExtractorStats partitionStats = this.statsMap.get(this.partitions.get(partitionIdx)); + + if (partitionStats.getStartFetchEpochTime() < aggregateExtractorStats.getMinStartFetchEpochTime()) { + aggregateExtractorStats.setMinStartFetchEpochTime(partitionStats.getStartFetchEpochTime()); + } + if (partitionStats.getStopFetchEpochTime() > aggregateExtractorStats.getMaxStopFetchEpochTime()) { + aggregateExtractorStats.setMaxStopFetchEpochTime(partitionStats.getStopFetchEpochTime()); + } + long partitionLatency = partitionStats.getStopFetchEpochTime() - partitionStats.getMinLogAppendTime(); + if (aggregateExtractorStats.getMaxIngestionLatency() < partitionLatency) { + aggregateExtractorStats.setMaxIngestionLatency(partitionLatency); + } + if (aggregateExtractorStats.getMinLogAppendTime() > partitionStats.getMinLogAppendTime()) { + aggregateExtractorStats.setMinLogAppendTime(partitionStats.getMinLogAppendTime()); + } + if (aggregateExtractorStats.getMaxLogAppendTime() < partitionStats.getMaxLogAppendTime()) { + aggregateExtractorStats.setMaxLogAppendTime(partitionStats.getMaxLogAppendTime()); + } + aggregateExtractorStats.setProcessedRecordCount(aggregateExtractorStats.getProcessedRecordCount() + partitionStats.getProcessedRecordCount()); + aggregateExtractorStats.setNumBytesConsumed(aggregateExtractorStats.getNumBytesConsumed() + partitionStats.getPartitionTotalSize()); + aggregateExtractorStats.setSlaMissedRecordCount(aggregateExtractorStats.getSlaMissedRecordCount() + partitionStats.getSlaMissedRecordCount()); } private Map<String, String> createTagsForPartition(int partitionId, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark, MultiLongWatermark nextWatermark) { @@ -451,9 +499,31 @@ public class KafkaExtractorStatsTracker { } /** + * @param timeUnit the time unit for the ingestion latency. + * @return the maximum ingestion latency across all partitions processed by the extractor from the last + * completed epoch. + */ + public long getMaxIngestionLatency(TimeUnit timeUnit) { + return timeUnit.convert(this.lastAggregateExtractorStats.getMaxIngestionLatency(), TimeUnit.MILLISECONDS); + } + + /** + * + * @return the consumption rate in MB/s across all partitions processed by the extractor from the last + * completed epoch. + */ + public double getConsumptionRateMBps() { + double consumptionDurationSecs = ((double) (this.lastAggregateExtractorStats.getMaxStopFetchEpochTime() - this.lastAggregateExtractorStats + .getMinStartFetchEpochTime())) / 1000; + return this.lastAggregateExtractorStats.getNumBytesConsumed() / (consumptionDurationSecs * (1024 * 1024L)); + } + + /** * Reset all KafkaExtractor stats. */ public void reset() { + this.lastAggregateExtractorStats = this.aggregateExtractorStats; + this.aggregateExtractorStats = new AggregateExtractorStats(); this.partitions.forEach(partition -> this.statsMap.put(partition, new ExtractorStats())); for (int partitionIdx = 0; partitionIdx < this.partitions.size(); partitionIdx++) { resetStartFetchEpochTime(partitionIdx); diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java new file mode 100644 index 0000000..d7a8346 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.EvictingQueue; +import com.google.common.eventbus.EventBus; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.commit.CommitStep; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent; +import org.apache.gobblin.util.eventbus.EventBusFactory; + + +@Slf4j +@Alias(value = "KafkaIngestionHealthCheck") +public class KafkaIngestionHealthCheck implements CommitStep { + public static final String KAFKA_INGESTION_HEALTH_CHECK_PREFIX = "gobblin.kafka.healthCheck."; + public static final String KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "slidingWindow.size"; + public static final String KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "ingestionLatency.minutes"; + public static final String KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "consumptionRate.dropOffFraction"; + public static final String KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "expected.consumptionRateMbps"; + public static final String KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY = KAFKA_INGESTION_HEALTH_CHECK_PREFIX + "increasingLatencyCheckEnabled"; + + public static final int DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE = 3; + public static final long DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES= 15; + public static final double DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION = 0.7; + public static final double DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS = 10.0; + private static final boolean DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED = true; + + private final Config config; + private final EventBus eventBus; + private final KafkaExtractorStatsTracker statsTracker; + private final double expectedConsumptionRate; + private final double consumptionRateDropOffFraction; + private final long ingestionLatencyThresholdMinutes; + private final int slidingWindowSize; + private final EvictingQueue<Long> ingestionLatencies; + private final EvictingQueue<Double> consumptionRateMBps; + private final boolean increasingLatencyCheckEnabled; + + public KafkaIngestionHealthCheck(Config config, KafkaExtractorStatsTracker statsTracker) { + this.config = config; + this.slidingWindowSize = ConfigUtils.getInt(config, KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_SLIDING_WINDOW_SIZE); + this.ingestionLatencyThresholdMinutes = ConfigUtils.getLong(config, KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES); + this.consumptionRateDropOffFraction = ConfigUtils.getDouble(config, KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_CONSUMPTION_RATE_DROPOFF_FRACTION); + this.expectedConsumptionRate = ConfigUtils.getDouble(config, KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS); + this.increasingLatencyCheckEnabled = ConfigUtils.getBoolean(config, KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY, DEFAULT_KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED); + this.ingestionLatencies = EvictingQueue.create(this.slidingWindowSize); + this.consumptionRateMBps = EvictingQueue.create(this.slidingWindowSize); + EventBus eventBus; + try { + eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME, + SharedResourcesBrokerFactory.getImplicitBroker()); + } catch (IOException e) { + log.error("Could not find EventBus instance for container health check", e); + eventBus = null; + } + this.eventBus = eventBus; + this.statsTracker = statsTracker; + } + + /** + * + * @return true if (i) ingestionLatency in the each of the recent epochs exceeds the threshold latency , AND (ii) + * if {@link KafkaIngestionHealthCheck#increasingLatencyCheckEnabled} is true, the latency + * is increasing over these epochs. + */ + private boolean checkIngestionLatency() { + Long previousLatency = -1L; + for (Long ingestionLatency: ingestionLatencies) { + if (ingestionLatency < this.ingestionLatencyThresholdMinutes) { + return false; + } else { + if (this.increasingLatencyCheckEnabled) { + if (previousLatency >= ingestionLatency) { + return false; + } + previousLatency = ingestionLatency; + } + } + } + return true; + } + + /** + * Determine whether the commit step has been completed. + */ + @Override + public boolean isCompleted() + throws IOException { + return false; + } + + /** + * @return Return a serialized string representation of health check report. + */ + private String getHealthCheckReport() { + return String.format("Ingestion Latencies = %s, Ingestion Latency Threshold = %s minutes, " + + "Consumption Rates = %s, Target Consumption Rate = %s MBps", this.ingestionLatencies.toString(), + this.ingestionLatencyThresholdMinutes, this.consumptionRateMBps.toString(), this.expectedConsumptionRate); + } + + /** + * Execute the commit step. The execute method gets the maximum ingestion latency and the consumption rate and emits + * a {@link ContainerHealthCheckFailureEvent} if the following conditions are satisfied: + * <li> + * <ul>The ingestion latency increases monotonically over the {@link KafkaIngestionHealthCheck#slidingWindowSize} intervals, AND </ul> + * <ul>The maximum consumption rate over the {@link KafkaIngestionHealthCheck#slidingWindowSize} intervals is smaller than + * {@link KafkaIngestionHealthCheck#consumptionRateDropOffFraction} * {@link KafkaIngestionHealthCheck#expectedConsumptionRate}</ul>. + * </li> + * + * The {@link ContainerHealthCheckFailureEvent} is posted to a global event bus. The handlers of this event type + * can perform suitable actions based on the execution environment. + */ + @Override + public void execute() { + this.ingestionLatencies.add(this.statsTracker.getMaxIngestionLatency(TimeUnit.MINUTES)); + this.consumptionRateMBps.add(this.statsTracker.getConsumptionRateMBps()); + if (ingestionLatencies.size() < this.slidingWindowSize) { + log.info("SUCCESS: Num observations: {} smaller than {}", ingestionLatencies.size(), this.slidingWindowSize); + return; + } + + if (!checkIngestionLatency()) { + log.info("SUCCESS: Ingestion Latencies = {}, Ingestion Latency Threshold: {}", this.ingestionLatencies.toString(), this.ingestionLatencyThresholdMinutes); + return; + } + + double avgConsumptionRate = getMaxConsumptionRate(); + if (avgConsumptionRate > this.consumptionRateDropOffFraction * this.expectedConsumptionRate) { + log.info("SUCCESS: Avg. Consumption Rate = {} MBps, Target Consumption rate = {} MBps", avgConsumptionRate, this.expectedConsumptionRate); + return; + } + + log.error("FAILED: {}", getHealthCheckReport()); + + if (this.eventBus != null) { + log.info("Posting {} message to EventBus", ContainerHealthCheckFailureEvent.class.getSimpleName()); + ContainerHealthCheckFailureEvent event = new ContainerHealthCheckFailureEvent(this.config, getClass().getName()); + event.addMetadata("ingestionLatencies", this.ingestionLatencies.toString()); + event.addMetadata("consumptionRates", this.consumptionRateMBps.toString()); + event.addMetadata("ingestionLatencyThreshold", Long.toString(this.ingestionLatencyThresholdMinutes)); + event.addMetadata("targetConsumptionRate", Double.toString(this.expectedConsumptionRate)); + this.eventBus.post(event); + } + } + + private double getMaxConsumptionRate() { + return consumptionRateMBps.stream().mapToDouble(consumptionRate -> consumptionRate) + .filter(consumptionRate -> consumptionRate >= 0.0).max().orElse(0.0); + } +} diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java index 1f959f2..67e1146 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.HdrHistogram.Histogram; import org.HdrHistogram.HistogramLogReader; @@ -40,6 +41,7 @@ public class KafkaExtractorStatsTrackerTest { private WorkUnitState workUnitState; final static KafkaPartition PARTITION0 = new KafkaPartition.Builder().withTopicName("test-topic").withId(0).build(); final static KafkaPartition PARTITION1 = new KafkaPartition.Builder().withTopicName("test-topic").withId(1).build(); + private long epochDurationMs; @BeforeClass public void setUp() { @@ -80,12 +82,13 @@ public class KafkaExtractorStatsTrackerTest { @Test public void testOnDecodeableRecord() throws InterruptedException { + this.extractorStatsTracker.reset(); long readStartTime = System.nanoTime(); Thread.sleep(1); long decodeStartTime = System.nanoTime(); long currentTimeMillis = System.currentTimeMillis(); - long logAppendTimestamp = currentTimeMillis - 15 * 60 * 1000L; - long recordCreationTimestamp = currentTimeMillis - 16 * 60 * 1000L; + long logAppendTimestamp = currentTimeMillis - TimeUnit.MINUTES.toMillis(15); + long recordCreationTimestamp = currentTimeMillis - TimeUnit.MINUTES.toMillis(16); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 0); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 0); Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() == 0); @@ -172,6 +175,20 @@ public class KafkaExtractorStatsTrackerTest { Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMinLogAppendTime(), logAppendTimestamp); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMaxLogAppendTime(), logAppendTimestamp); Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 3); + + long startFetchEpochTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getStartFetchEpochTime(); + long stopFetchEpochTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getStopFetchEpochTime(); + this.epochDurationMs = stopFetchEpochTime - startFetchEpochTime; + long minLogAppendTimestamp = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(); + long maxLogAppendTimestamp = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMaxLogAppendTime(); + //Ensure aggregate extractor stats have been updated correctly for the completed epoch + Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMinStartFetchEpochTime(), startFetchEpochTime); + Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMaxStopFetchEpochTime(), stopFetchEpochTime); + Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMinLogAppendTime(), minLogAppendTimestamp); + Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getMaxLogAppendTime(), maxLogAppendTimestamp); + Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getNumBytesConsumed(), 300L); + Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getProcessedRecordCount(), 3L); + Assert.assertEquals(this.extractorStatsTracker.getAggregateExtractorStats().getSlaMissedRecordCount(), 1); } @Test (dependsOnMethods = "testUpdateStatisticsForCurrentPartition") @@ -189,6 +206,17 @@ public class KafkaExtractorStatsTrackerTest { Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 150); } + @Test (dependsOnMethods = "testGetAvgRecordSize") + public void testGetMaxLatency() { + Assert.assertTrue(this.extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES) >= 15); + } + + @Test (dependsOnMethods = "testGetAvgRecordSize") + public void testGetConsumptionRateMBps() { + double a = this.extractorStatsTracker.getConsumptionRateMBps(); + Assert.assertEquals((new Double(Math.ceil(a * epochDurationMs * 1024 * 1024) / 1000)).longValue(), 300L); + } + @Test public void testGenerateTagsForPartitions() throws Exception { MultiLongWatermark lowWatermark = new MultiLongWatermark(Arrays.asList(new Long(10), new Long(20))); diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java new file mode 100644 index 0000000..ff7049f --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheckTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent; +import org.apache.gobblin.util.eventbus.EventBusFactory; + +@Test (singleThreaded = true) +public class KafkaIngestionHealthCheckTest { + private EventBus eventBus; + private CountDownLatch countDownLatch; + + @BeforeClass + public void setUp() throws IOException { + this.eventBus = EventBusFactory.get(ContainerHealthCheckFailureEvent.CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME, + SharedResourcesBrokerFactory.getImplicitBroker()); + this.eventBus.register(this); + } + + @Subscribe + public void handleContainerHealthCheckFailureEvent(ContainerHealthCheckFailureEvent event) { + this.countDownLatch.countDown(); + } + + @Test + public void testExecuteIncreasingLatencyCheckEnabled() + throws InterruptedException { + this.countDownLatch = new CountDownLatch(1); + Config config = ConfigFactory.empty().withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY, + ConfigValueFactory.fromAnyRef(5)) + .withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY, ConfigValueFactory.fromAnyRef(5)); + + KafkaExtractorStatsTracker extractorStatsTracker = Mockito.mock(KafkaExtractorStatsTracker.class); + Mockito.when(extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES)) + .thenReturn(6L) + .thenReturn(7L) + .thenReturn(10L) + .thenReturn(5L); + Mockito.when(extractorStatsTracker.getConsumptionRateMBps()) + .thenReturn(2.0) + .thenReturn(1.5) + .thenReturn(2.1) + .thenReturn(2.5); + + KafkaIngestionHealthCheck check = new KafkaIngestionHealthCheck(config, extractorStatsTracker); + + //Latency increases continuously for the first 3 calls to execute(). + check.execute(); + this.countDownLatch.await(10, TimeUnit.MILLISECONDS); + Assert.assertEquals(this.countDownLatch.getCount(), 1L); + check.execute(); + this.countDownLatch.await(10, TimeUnit.MILLISECONDS); + Assert.assertEquals(this.countDownLatch.getCount(), 1L); + check.execute(); + //Ensure that ContainerHealthCheckFailureEvent is posted to eventBus; countDownLatch should be back to 0. + this.countDownLatch.await(10, TimeUnit.MILLISECONDS); + Assert.assertEquals(this.countDownLatch.getCount(), 0); + + //Set the countdown latch back to 1. + this.countDownLatch = new CountDownLatch(1); + //Latency decreases from 10 to 5. So check.execute() should not post any event to EventBus. + check.execute(); + this.countDownLatch.await(10, TimeUnit.MILLISECONDS); + Assert.assertEquals(this.countDownLatch.getCount(), 1); + + config = config.withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY, ConfigValueFactory.fromAnyRef(false)); + extractorStatsTracker = Mockito.mock(KafkaExtractorStatsTracker.class); + Mockito.when(extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES)) + .thenReturn(10L) + .thenReturn(7L) + .thenReturn(5L); + Mockito.when(extractorStatsTracker.getConsumptionRateMBps()) + .thenReturn(2.0) + .thenReturn(1.5) + .thenReturn(2.1); + + check = new KafkaIngestionHealthCheck(config, extractorStatsTracker); + + check.execute(); + } + + @Test + public void testExecuteIncreasingLatencyCheckDisabled() + throws InterruptedException { + this.countDownLatch = new CountDownLatch(1); + + Config config = ConfigFactory.empty().withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_EXPECTED_CONSUMPTION_RATE_MBPS_KEY, + ConfigValueFactory.fromAnyRef(5)) + .withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_LATENCY_THRESHOLD_MINUTES_KEY, ConfigValueFactory.fromAnyRef(5)) + .withValue(KafkaIngestionHealthCheck.KAFKA_INGESTION_HEALTH_CHECK_INCREASING_LATENCY_CHECK_ENABLED_KEY, ConfigValueFactory.fromAnyRef(false)); + + KafkaExtractorStatsTracker extractorStatsTracker = Mockito.mock(KafkaExtractorStatsTracker.class); + Mockito.when(extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES)) + .thenReturn(10L) + .thenReturn(7L) + .thenReturn(6L) + .thenReturn(4L); + Mockito.when(extractorStatsTracker.getConsumptionRateMBps()) + .thenReturn(2.0) + .thenReturn(1.5) + .thenReturn(2.1) + .thenReturn(2.5); + + KafkaIngestionHealthCheck check = new KafkaIngestionHealthCheck(config, extractorStatsTracker); + + //Latency consistently above 5 minutes for the first 3 calls to execute(). + check.execute(); + this.countDownLatch.await(10, TimeUnit.MILLISECONDS); + Assert.assertEquals(this.countDownLatch.getCount(), 1L); + check.execute(); + this.countDownLatch.await(10, TimeUnit.MILLISECONDS); + Assert.assertEquals(this.countDownLatch.getCount(), 1L); + check.execute(); + //Ensure that ContainerHealthCheckFailureEvent is posted to eventBus; countDownLatch should be back to 0. + this.countDownLatch.await(10, TimeUnit.MILLISECONDS); + Assert.assertEquals(this.countDownLatch.getCount(), 0); + + //Set the countdown latch back to 1. + this.countDownLatch = new CountDownLatch(1); + //Latency decreases to 4. So check.execute() should not post any event to EventBus. + check.execute(); + this.countDownLatch.await(10, TimeUnit.MILLISECONDS); + Assert.assertEquals(this.countDownLatch.getCount(), 1); + } +} \ No newline at end of file diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java similarity index 97% rename from gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java rename to gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java index ded0d5c..282ede6 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java @@ -25,9 +25,7 @@ import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; - -import org.apache.gobblin.restli.EmbeddedRestliServer; -import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.curator.test.TestingServer; import org.apache.hadoop.fs.Path; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jgit.api.Git; @@ -52,12 +50,13 @@ import com.linkedin.restli.client.RestLiResponseException; import com.typesafe.config.Config; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.kafka.KafkaTestBase; import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory; import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.restli.EmbeddedRestliServer; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.service.modules.core.GitConfigMonitor; import org.apache.gobblin.service.modules.core.GobblinServiceManager; import org.apache.gobblin.service.monitoring.FsJobStatusRetriever; @@ -93,6 +92,7 @@ public class GobblinServiceManagerTest { private FlowConfigClient flowConfigClient; private Git gitForPush; + private TestingServer testingServer; @BeforeClass public void setup() throws Exception { @@ -100,14 +100,12 @@ public class GobblinServiceManagerTest { cleanUpDir(SPEC_STORE_PARENT_DIR); ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); - KafkaTestBase kafkaTestHelper = new KafkaTestBase(); - kafkaTestHelper.startServers(); - + testingServer = new TestingServer(true); Properties serviceCoreProperties = new Properties(); serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_USER_KEY, "testUser"); serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "testPassword"); serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl()); - serviceCoreProperties.put("zookeeper.connect", kafkaTestHelper.getZkConnectString()); + serviceCoreProperties.put("zookeeper.connect", testingServer.getConnectString()); serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName()); serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, TOPOLOGY_SPEC_STORE_DIR); @@ -178,6 +176,11 @@ public class GobblinServiceManagerTest { } catch (Exception e) { logger.warn("Could not completely cleanup Spec Store Parent Dir"); } + try { + this.testingServer.close(); + } catch(Exception e) { + System.err.println("Failed to close ZK testing server."); + } } @Test diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java new file mode 100644 index 0000000..e10e675 --- /dev/null +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/event/ContainerHealthCheckFailureEvent.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * An event type to signal failure of a container health check. This event can be generated from anywhere + * inside the application. This event is intended to be emitted + * over an {@link com.google.common.eventbus.EventBus} instance. + */ +package org.apache.gobblin.util.event; + +import java.util.Map; + +import com.google.common.collect.Maps; +import com.typesafe.config.Config; + +import lombok.Getter; + +public class ContainerHealthCheckFailureEvent { + public static final String CONTAINER_HEALTH_CHECK_EVENT_BUS_NAME = "ContainerHealthCheckEventBus"; + + @Getter + private final Config config; + + /** + * Name of the class that generated this failure event. + */ + @Getter + private final String className; + + @Getter + private final Map<String, String> metadata = Maps.newHashMap(); + + public ContainerHealthCheckFailureEvent(Config config, String className) { + this.config = config; + this.className = className; + } + + public void addMetadata(String key, String value) { + metadata.put(key, value); + } +} diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusFactory.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusFactory.java new file mode 100644 index 0000000..4c4fdbd --- /dev/null +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.eventbus; + +import java.io.IOException; + +import com.google.common.eventbus.EventBus; + +import org.apache.gobblin.broker.ResourceInstance; +import org.apache.gobblin.broker.iface.ConfigView; +import org.apache.gobblin.broker.iface.NotConfiguredException; +import org.apache.gobblin.broker.iface.ScopeType; +import org.apache.gobblin.broker.iface.ScopedConfigView; +import org.apache.gobblin.broker.iface.SharedResourceFactory; +import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; + + +/** + * A {@link SharedResourceFactory} for creating {@link EventBus} instances. + * @param <S> + */ +public class EventBusFactory<S extends ScopeType<S>> implements SharedResourceFactory<EventBus, EventBusKey, S> { + public static final String FACTORY_NAME = "eventbus"; + + @Override + public String getName() { + return FACTORY_NAME; + } + + public static <S extends ScopeType<S>> EventBus get(String eventBusName, SharedResourcesBroker<S> broker) + throws IOException { + try { + return broker.getSharedResource(new EventBusFactory<S>(), new EventBusKey(eventBusName)); + } catch (NotConfiguredException e) { + throw new IOException(e); + } + } + + @Override + public SharedResourceFactoryResponse<EventBus> createResource(SharedResourcesBroker<S> broker, + ScopedConfigView<S, EventBusKey> config) { + EventBusKey eventBusKey = config.getKey(); + EventBus eventBus = new EventBus(eventBusKey.getSourceClassName()); + return new ResourceInstance<>(eventBus); + } + + @Override + public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, EventBusKey> config) { + return broker.selfScope().getType().rootScope(); + } +} diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusKey.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusKey.java new file mode 100644 index 0000000..7f4c459 --- /dev/null +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/eventbus/EventBusKey.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.eventbus; + +import lombok.EqualsAndHashCode; +import lombok.Getter; + +import org.apache.gobblin.broker.iface.SharedResourceKey; + +@EqualsAndHashCode +@Getter +public class EventBusKey implements SharedResourceKey{ + private final String sourceClassName; + + public EventBusKey(String sourceClassName) { + this.sourceClassName = sourceClassName; + } + + /** + * @return A serialization of the {@link SharedResourceKey} into a short, sanitized string. Users configure a + * shared resource using the value of this method. + */ + @Override + public String toConfigurationKey() { + return this.sourceClassName; + } +} diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/eventbus/EventBusFactoryTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/eventbus/EventBusFactoryTest.java new file mode 100644 index 0000000..18cd5a1 --- /dev/null +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/eventbus/EventBusFactoryTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.util.eventbus; + +import java.io.IOException; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.eventbus.EventBus; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.SharedResourcesBrokerImpl; +import org.apache.gobblin.broker.SimpleScope; +import org.apache.gobblin.broker.SimpleScopeType; +import org.apache.gobblin.broker.iface.NoSuchScopeException; +import org.apache.gobblin.broker.iface.NotConfiguredException; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; + + +public class EventBusFactoryTest { + + @Test + public void testGet() + throws NotConfiguredException, IOException, NoSuchScopeException { + SharedResourcesBrokerImpl<SimpleScopeType> broker = SharedResourcesBrokerFactory.<SimpleScopeType>createDefaultTopLevelBroker( + ConfigFactory.empty(), SimpleScopeType.GLOBAL.defaultScopeInstance()); + + EventBus eventBus1 = EventBusFactory.get(getClass().getSimpleName(), broker); + EventBus eventBus2 = EventBusFactory.get(getClass().getSimpleName(), broker); + + //Should return the same eventbus instance + Assert.assertEquals(eventBus1, eventBus2); + + SharedResourcesBroker<SimpleScopeType> subBroker = + broker.newSubscopedBuilder(new SimpleScope<>(SimpleScopeType.LOCAL, "local")).build(); + EventBus eventBus3 = EventBusFactory.get(getClass().getSimpleName(), subBroker); + //Should return the same eventbus instance + Assert.assertEquals(eventBus1, eventBus3); + + //Create a new eventbus with local scope + EventBus eventBus4 = subBroker.getSharedResourceAtScope(new EventBusFactory<>(), new EventBusKey(getClass().getSimpleName()), SimpleScopeType.LOCAL); + Assert.assertNotEquals(eventBus3, eventBus4); + + //Create an eventbus with different source class name + EventBus eventBus5 = EventBusFactory.get("", broker); + Assert.assertNotEquals(eventBus1, eventBus5); + } +} \ No newline at end of file diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java index f79a5bb..1abf8d2 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java @@ -25,7 +25,6 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.apache.gobblin.util.logs.LogCopier; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -46,11 +45,13 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import org.apache.gobblin.cluster.ContainerHealthCheckException; import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; import org.apache.gobblin.cluster.GobblinClusterUtils; import org.apache.gobblin.cluster.GobblinTaskRunner; import org.apache.gobblin.util.JvmUtils; import org.apache.gobblin.util.logs.Log4jConfigurationHelper; +import org.apache.gobblin.util.logs.LogCopier; import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent; @@ -209,6 +210,12 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner { } catch (ParseException pe) { printUsage(options); System.exit(1); + } catch (ContainerHealthCheckException e) { + // Ideally, we should not be catching this exception, as this is indicative of a non-recoverable exception. However, + // simply propagating the exception may prevent the container exit due to the presence of non-daemon threads present + // in the application. Hence, we catch this exception to invoke System.exit() which in turn ensures that all non-daemon threads are killed. + LOGGER.error("Exception encountered: {}", e); + System.exit(1); } } } \ No newline at end of file diff --git a/travis/test-groups.inc b/travis/test-groups.inc index af6e517..8eef534 100644 --- a/travis/test-groups.inc +++ b/travis/test-groups.inc @@ -1 +1 @@ -TEST_GROUP1=gobbin.yarn,gobblin.runtime,gobblin.cluster,gobblin.compaction +TEST_GROUP1=gobbin.yarn,gobblin.runtime,gobblin.cluster,gobblin.compaction,gobblin.util,gobblin.writer