Repository: twill Updated Branches: refs/heads/master 439c1096d -> 107dc1e20
(TWILL-61) Fix to allow higher attempts to relaunch the app after the first attempt failed - Delete the Kafka root zk node for the application if already exist - Delete the AM instance zk node if already exist - For runnables parent zk node, it is not an error if it already exist - Enhance KafkaClient publisher / consumer to deal with Kafka cluster changes - When AM killed and restarted, the embedded Kafka will be running in different host and port This closes #67 on Github. Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/twill/repo Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/107dc1e2 Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/107dc1e2 Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/107dc1e2 Branch: refs/heads/master Commit: 107dc1e20c63207695bb5f8b5f97186b6b3f9412 Parents: 439c109 Author: Terence Yim <[email protected]> Authored: Fri Mar 9 12:21:26 2018 -0800 Committer: Terence Yim <[email protected]> Committed: Thu Mar 15 16:13:34 2018 -0700 ---------------------------------------------------------------------- pom.xml | 14 ++ .../main/java/org/apache/twill/api/Configs.java | 12 ++ .../twill/internal/AbstractTwillController.java | 18 ++ .../twill/internal/AbstractTwillService.java | 14 +- .../kafka/client/SimpleKafkaConsumer.java | 19 +- .../kafka/client/SimpleKafkaPublisher.java | 41 ++-- .../internal/kafka/client/ZKBrokerService.java | 39 ++-- .../internal/yarn/Hadoop21YarnAppClient.java | 17 +- .../internal/yarn/Hadoop23YarnAppClient.java | 4 +- .../internal/yarn/Hadoop26YarnAppClient.java | 48 +++++ .../appmaster/ApplicationMasterMain.java | 7 +- .../appmaster/ApplicationMasterService.java | 10 +- .../yarn/VersionDetectYarnAppClientFactory.java | 6 +- .../apache/twill/yarn/YarnTwillController.java | 11 ++ .../apache/twill/yarn/AppRecoveryTestRun.java | 189 +++++++++++++++++++ .../apache/twill/zookeeper/ZKOperations.java | 72 +++++++ 16 files changed, 467 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 84c4b8c..45aa64d 100644 --- a/pom.xml +++ b/pom.xml @@ -180,6 +180,7 @@ <jopt-simple.version>3.2</jopt-simple.version> <commons-compress.version>1.5</commons-compress.version> <hadoop20.output.dir>target/hadoop20-classes</hadoop20.output.dir> + <force.mac.tests>false</force.mac.tests> </properties> <scm> @@ -346,6 +347,7 @@ <redirectTestOutputToFile>${surefire.redirectTestOutputToFile}</redirectTestOutputToFile> <systemPropertyVariables> <java.io.tmpdir>${project.build.directory}</java.io.tmpdir> + <force.mac.tests>${force.mac.tests}</force.mac.tests> </systemPropertyVariables> <reuseForks>false</reuseForks> <reportFormat>plain</reportFormat> @@ -362,6 +364,18 @@ <profiles> <profile> + <!-- + This profile is to force certain tests to run on Mac. + Those tests are disabled due to orphan processes left after the test run (HADOOP-12317). + If this profile is enabled, after the test finished, run the `jps` command + and delete all `TwillLauncher` processes + --> + <id>force-mac-tests</id> + <properties> + <force.mac.tests>true</force.mac.tests> + </properties> + </profile> + <profile> <id>apache-release</id> <build> <plugins> http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-api/src/main/java/org/apache/twill/api/Configs.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/api/Configs.java b/twill-api/src/main/java/org/apache/twill/api/Configs.java index 9a21489..20a25f6 100644 --- a/twill-api/src/main/java/org/apache/twill/api/Configs.java +++ b/twill-api/src/main/java/org/apache/twill/api/Configs.java @@ -79,6 +79,18 @@ public final class Configs { public static final String YARN_AM_RESERVED_MEMORY_MB = "twill.yarn.am.reserved.memory.mb"; /** + * Maximum number of attempts to run the application by YARN if there is failure. + */ + public static final String YARN_MAX_APP_ATTEMPTS = "twill.yarn.max.app.attempts"; + + /** + * Interval time in milliseconds for the attempt failures validity interval in YARN. YARN only limit to + * the maximum attempt count for failures in the given interval. + */ + public static final String YARN_ATTEMPT_FAILURES_VALIDITY_INTERVAL = + "twill.yarn.attempt.failures.validity.interval"; + + /** * Setting for enabling log collection. */ public static final String LOG_COLLECTION_ENABLED = "twill.log.collection.enabled"; http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java index fd8a939..0ff2fc8 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java @@ -222,6 +222,24 @@ public abstract class AbstractTwillController extends AbstractZKServiceControlle return sendMessage(SystemMessages.resetLogLevels(runnableName, Sets.newHashSet(loggerNames)), loggerNames); } + /** + * Reset the log handler to poll from the beginning of Kafka. + */ + protected final synchronized void resetLogHandler() { + if (kafkaClient == null) { + return; + } + if (logCancellable != null) { + logCancellable.cancel(); + logCancellable = null; + } + if (!logHandlers.isEmpty()) { + logCancellable = kafkaClient.getConsumer().prepare() + .addFromBeginning(Constants.LOG_TOPIC, 0) + .consume(new LogMessageCallback(logHandlers)); + } + } + private void validateInstanceIds(String runnable, Set<Integer> instanceIds) { ResourceReport resourceReport = getResourceReport(); if (resourceReport == null) { http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java index 8e73653..425cd43 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java @@ -206,7 +206,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic } /** - * Update the live node for the runnable. + * Update the live node for the service. * * @return A {@link OperationFuture} that will be completed when the update is done. */ @@ -216,11 +216,15 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic return zkClient.setData(liveNodePath, serializeLiveNode()); } + /** + * Creates the live node for the service. If the node already exists, it will be deleted before creation. + * + * @return A {@link OperationFuture} that will be completed when the creation is done. + */ private OperationFuture<String> createLiveNode() { - String liveNodePath = getLiveNodePath(); - LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath); - return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL), - KeeperException.NodeExistsException.class, liveNodePath); + final String liveNodePath = getLiveNodePath(); + LOG.info("Creating live node {}{}", zkClient.getConnectString(), liveNodePath); + return ZKOperations.createDeleteIfExists(zkClient, liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL, true); } private OperationFuture<String> removeLiveNode() { http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java index 73235b7..f69350e 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java @@ -340,17 +340,17 @@ final class SimpleKafkaConsumer implements KafkaConsumer { continue; } - // If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset - // from kafak server. - long off = offset.get(); - if (off < 0) { - offset.set(getLastOffset(topicPart, off)); - } + try { + // If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset + // from kafak server. + long off = offset.get(); + if (off < 0) { + offset.set(getLastOffset(topicPart, off)); + } - SimpleConsumer consumer = consumerEntry.getValue(); + SimpleConsumer consumer = consumerEntry.getValue(); - // Fire a fetch message request - try { + // Fire a fetch message request FetchResponse response = fetchMessages(consumer, offset.get()); // Failure response, set consumer entry to null and let next round of loop to handle it. @@ -364,6 +364,7 @@ final class SimpleKafkaConsumer implements KafkaConsumer { consumers.refresh(consumerEntry.getKey()); consumerEntry = null; + backoff.backoff(); continue; } http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java index f147d24..e5d0f8d 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java @@ -54,11 +54,11 @@ final class SimpleKafkaPublisher implements KafkaPublisher { private final AtomicReference<Producer<Integer, ByteBuffer>> producer; private final AtomicBoolean listenerCancelled; - public SimpleKafkaPublisher(BrokerService brokerService, Ack ack, Compression compression) { + SimpleKafkaPublisher(BrokerService brokerService, Ack ack, Compression compression) { this.brokerService = brokerService; this.ack = ack; this.compression = compression; - this.producer = new AtomicReference<Producer<Integer, ByteBuffer>>(); + this.producer = new AtomicReference<>(); this.listenerCancelled = new AtomicBoolean(false); } @@ -107,7 +107,7 @@ final class SimpleKafkaPublisher implements KafkaPublisher { @Override public Preparer add(ByteBuffer message, Object partitionKey) { - messages.add(new KeyedMessage<Integer, ByteBuffer>(topic, Math.abs(partitionKey.hashCode()), message)); + messages.add(new KeyedMessage<>(topic, Math.abs(partitionKey.hashCode()), message)); return this; } @@ -159,30 +159,37 @@ final class SimpleKafkaPublisher implements KafkaPublisher { } String newBrokerList = brokerService.getBrokerList(); - if (newBrokerList.isEmpty()) { - LOG.warn("Broker list is empty. No Kafka producer is created."); - return; - } + // If there is no change, whether it is empty or not, just return if (Objects.equal(brokerList, newBrokerList)) { return; } - Properties props = new Properties(); - props.put("metadata.broker.list", newBrokerList); - props.put("serializer.class", ByteBufferEncoder.class.getName()); - props.put("key.serializer.class", IntegerEncoder.class.getName()); - props.put("partitioner.class", IntegerPartitioner.class.getName()); - props.put("request.required.acks", Integer.toString(ack.getAck())); - props.put("compression.codec", compression.getCodec()); + Producer<Integer, ByteBuffer> newProducer = null; + if (!newBrokerList.isEmpty()) { + Properties props = new Properties(); + props.put("metadata.broker.list", newBrokerList); + props.put("serializer.class", ByteBufferEncoder.class.getName()); + props.put("key.serializer.class", IntegerEncoder.class.getName()); + props.put("partitioner.class", IntegerPartitioner.class.getName()); + props.put("request.required.acks", Integer.toString(ack.getAck())); + props.put("compression.codec", compression.getCodec()); + + ProducerConfig config = new ProducerConfig(props); + newProducer = new Producer<>(config); + } - ProducerConfig config = new ProducerConfig(props); - Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(new Producer<Integer, ByteBuffer>(config)); + // If the broker list is empty, the producer will be set to null + Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(newProducer); if (oldProducer != null) { oldProducer.close(); } - LOG.info("Update Kafka producer broker list: {}", newBrokerList); + if (newBrokerList.isEmpty()) { + LOG.warn("Empty Kafka producer broker list, publish will fail."); + } else { + LOG.info("Updated Kafka producer broker list: {}", newBrokerList); + } brokerList = newBrokerList; } } http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java index 2ffc604..de42b9b 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java @@ -51,6 +51,8 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -58,6 +60,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; /** * A {@link BrokerService} that watches kafka zk nodes for updates of broker lists and leader for @@ -136,11 +139,13 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker return brokerList.get(); } - final SettableFuture<?> readerFuture = SettableFuture.create(); - final AtomicReference<Iterable<BrokerInfo>> brokers = - new AtomicReference<Iterable<BrokerInfo>>(ImmutableList.<BrokerInfo>of()); + final SettableFuture<?> readyFuture = SettableFuture.create(); + final AtomicReference<List<BrokerInfo>> brokers = new AtomicReference<>(Collections.<BrokerInfo>emptyList()); actOnExists(BROKER_IDS_PATH, new Runnable() { + + final Runnable thisRunnable = this; + @Override public void run() { // Callback for fetching children list. This callback should be executed in the executorService. @@ -154,19 +159,19 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker Iterables.transform( brokerInfos.getAll(Iterables.transform(result.getChildren(), BROKER_ID_TRANSFORMER)).values(), Suppliers.<BrokerInfo>supplierFunction()))); - readerFuture.set(null); + readyFuture.set(null); for (ListenerExecutor listener : listeners) { listener.changed(ZKBrokerService.this); } } catch (ExecutionException e) { - readerFuture.setException(e.getCause()); + readyFuture.setException(e.getCause()); } } @Override public void onFailure(Throwable t) { - readerFuture.setException(t); + readyFuture.setException(t); } }; @@ -179,15 +184,25 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker } if (event.getType() == Event.EventType.NodeChildrenChanged) { Futures.addCallback(zkClient.getChildren(BROKER_IDS_PATH, this), childrenCallback, executorService); + } else if (event.getType() == Event.EventType.NodeDeleted) { + // If the ids node is deleted, clear the broker list and re-watch. + // This could happen when the Kafka server is restarted and have the ZK node cleanup + // The readyFuture for this call doesn't matter, as we don't need to block on anything + brokers.set(Collections.<BrokerInfo>emptyList()); + for (ListenerExecutor listener : listeners) { + listener.changed(ZKBrokerService.this); + } + actOnExists(BROKER_IDS_PATH, thisRunnable, SettableFuture.create(), + FAILURE_RETRY_SECONDS, TimeUnit.SECONDS); } } }), childrenCallback, executorService); } - }, readerFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS); + }, readyFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS); - brokerList = createSupplier(brokers); + brokerList = this.<Iterable<BrokerInfo>>createSupplier(brokers); try { - readerFuture.get(); + readyFuture.get(); } catch (Exception e) { throw Throwables.propagate(e); } @@ -223,7 +238,7 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker public Supplier<T> load(final K key) throws Exception { // A future to tell if the result is ready, even it is failure. final SettableFuture<T> readyFuture = SettableFuture.create(); - final AtomicReference<T> resultValue = new AtomicReference<T>(); + final AtomicReference<T> resultValue = new AtomicReference<>(); // Fetch for node data when it exists. final String path = key.getPath(); @@ -312,7 +327,7 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker } }), new FutureCallback<Stat>() { @Override - public void onSuccess(Stat result) { + public void onSuccess(@Nullable Stat result) { if (result != null) { action.run(); } else { @@ -345,7 +360,7 @@ public final class ZKBrokerService extends AbstractIdleService implements Broker /** * Creates a supplier that always return latest copy from an {@link java.util.concurrent.atomic.AtomicReference}. */ - private <T> Supplier<T> createSupplier(final AtomicReference<T> ref) { + private <T> Supplier<T> createSupplier(final AtomicReference<? extends T> ref) { return new Supplier<T>() { @Override public T get() { http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java index aa14a75..c219171 100644 --- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java +++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java @@ -59,7 +59,7 @@ import javax.annotation.Nullable; public class Hadoop21YarnAppClient implements YarnAppClient { private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAppClient.class); - private final Configuration configuration; + protected final Configuration configuration; public Hadoop21YarnAppClient(Configuration configuration) { this.configuration = configuration; @@ -108,7 +108,7 @@ public class Hadoop21YarnAppClient implements YarnAppClient { addRMToken(launchContext, yarnClient, appId); appSubmissionContext.setAMContainerSpec(launchContext); appSubmissionContext.setResource(capability); - appSubmissionContext.setMaxAppAttempts(2); + configureAppSubmissionContext(appSubmissionContext); yarnClient.submitApplication(appSubmissionContext); return new ProcessControllerImpl(appId); @@ -126,6 +126,19 @@ public class Hadoop21YarnAppClient implements YarnAppClient { } } + /** + * Updates the {@link ApplicationSubmissionContext} based on configuration. + */ + protected void configureAppSubmissionContext(ApplicationSubmissionContext context) { + int maxAttempts = configuration.getInt(Configs.Keys.YARN_MAX_APP_ATTEMPTS, -1); + if (maxAttempts > 0) { + context.setMaxAppAttempts(maxAttempts); + } else { + // Preserve the old behavior + context.setMaxAppAttempts(2); + } + } + private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) { int maxMemory = response.getMaximumResourceCapability().getMemory(); int updatedMemory = capability.getMemory(); http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java b/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java index 97d2a64..0e3382f 100644 --- a/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java +++ b/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java @@ -48,14 +48,12 @@ import java.util.List; * </p> */ @SuppressWarnings("unused") -public final class Hadoop23YarnAppClient extends Hadoop21YarnAppClient { +public class Hadoop23YarnAppClient extends Hadoop21YarnAppClient { private static final Logger LOG = LoggerFactory.getLogger(Hadoop23YarnAppClient.class); - private final Configuration configuration; public Hadoop23YarnAppClient(Configuration configuration) { super(configuration); - this.configuration = configuration; } /** http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java new file mode 100644 index 0000000..1c27518 --- /dev/null +++ b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.twill.internal.yarn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.twill.api.Configs; + +/** + * <p> + * The service implementation of {@link YarnAppClient} for Apache Hadoop 2.6 and beyond. + * + * The {@link VersionDetectYarnAppClientFactory} class will decide to return instance of this class for + * Apache Hadoop 2.6 and beyond. + * </p> + */ +@SuppressWarnings("unused") +public class Hadoop26YarnAppClient extends Hadoop23YarnAppClient { + + public Hadoop26YarnAppClient(Configuration configuration) { + super(configuration); + } + + @Override + protected void configureAppSubmissionContext(ApplicationSubmissionContext context) { + super.configureAppSubmissionContext(context); + long interval = configuration.getLong(Configs.Keys.YARN_ATTEMPT_FAILURES_VALIDITY_INTERVAL, -1L); + if (interval > 0) { + context.setAttemptFailuresValidityInterval(interval); + } + } +} http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java index 445656d..7706d52 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java @@ -165,9 +165,10 @@ public final class ApplicationMasterMain extends ServiceMain { @Override protected void startUp() throws Exception { - ZKOperations.ignoreError( - zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT), - KeeperException.NodeExistsException.class, kafkaZKPath).get(); + // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is + // no left over content from previous AM attempt. + LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath); + ZKOperations.createDeleteIfExists(zkClient, kafkaZKPath, null, CreateMode.PERSISTENT, true).get(); kafkaServer.startAndWait(); } http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index 6fc31f5..8a80041 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -78,7 +78,9 @@ import org.apache.twill.internal.yarn.YarnContainerStatus; import org.apache.twill.internal.yarn.YarnUtils; import org.apache.twill.zookeeper.ZKClient; import org.apache.twill.zookeeper.ZKClients; +import org.apache.twill.zookeeper.ZKOperations; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -318,8 +320,12 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger")); - // Creates ZK path for runnable - zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get(); + // Creates ZK path for runnable. It's ok if the path already exists. + // That's for the case when the AM get killed and restarted + ZKOperations.ignoreError( + zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT), + KeeperException.NodeExistsException.class, null) + .get(); runningContainers.addWatcher(Constants.DISCOVERY_PATH_PREFIX); runnableContainerRequests = initContainerRequests(); } http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java index c8e88c9..83de2a4 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java @@ -39,9 +39,13 @@ public final class VersionDetectYarnAppClientFactory implements YarnAppClientFac // 2.1 and 2.2 uses the same YarnAppClient clzName = getClass().getPackage().getName() + ".Hadoop21YarnAppClient"; break; - default: + case HADOOP_23: // 2.3 and above uses the 2.3 YarnAppClient to support RM HA clzName = getClass().getPackage().getName() + ".Hadoop23YarnAppClient"; + break; + default: + // Anything above 2.3 will be 2.6 and beyond + clzName = getClass().getPackage().getName() + ".Hadoop26YarnAppClient"; } Class<YarnAppClient> clz = (Class<YarnAppClient>) Class.forName(clzName); return clz.getConstructor(Configuration.class).newInstance(configuration); http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java index 335d7ec..8f844a2 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java @@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -68,6 +69,7 @@ final class YarnTwillController extends AbstractTwillController implements Twill private final TimeUnit startTimeoutUnit; private volatile ApplicationMasterLiveNodeData amLiveNodeData; private ProcessController<YarnApplicationReport> processController; + private ApplicationAttemptId currentAttemptId; // Thread for polling yarn for application status if application got ZK session expire. // Only used by the instanceUpdate/Delete method, which is from serialized call from ZK callback. @@ -141,6 +143,8 @@ final class YarnTwillController extends AbstractTwillController implements Twill LOG.info("Yarn application {} {} is not in running state. Shutting down controller.", appName, appId); forceShutDown(); } + + currentAttemptId = report.getCurrentApplicationAttemptId(); } catch (Exception e) { throw Throwables.propagate(e); } @@ -273,6 +277,13 @@ final class YarnTwillController extends AbstractTwillController implements Twill shutdown = true; break; } + ApplicationAttemptId attemptId = report.getCurrentApplicationAttemptId(); + if (currentAttemptId.compareTo(attemptId) != 0) { + LOG.info("Application attempt ID change from {} to {}", currentAttemptId, attemptId); + currentAttemptId = attemptId; + resetLogHandler(); + } + // Make a sync exists call to instance node and re-watch if the node exists try { // The timeout is arbitrary, as it's just for avoiding block forever http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java new file mode 100644 index 0000000..4f5adce --- /dev/null +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.twill.yarn; + +import com.google.common.io.Files; +import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.twill.api.AbstractTwillRunnable; +import org.apache.twill.api.EventHandler; +import org.apache.twill.api.TwillApplication; +import org.apache.twill.api.TwillController; +import org.apache.twill.api.TwillRunner; +import org.apache.twill.api.TwillSpecification; +import org.apache.twill.api.logging.LogEntry; +import org.apache.twill.api.logging.LogHandler; +import org.apache.twill.api.logging.PrinterLogHandler; +import org.apache.twill.internal.yarn.YarnUtils; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.PrintWriter; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * Unit test for application master resilience. + */ +public class AppRecoveryTestRun extends BaseYarnTest { + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + @Test + public void testAMRestart() throws Exception { + // Only run it with Hadoop-2.1 or above + Assume.assumeTrue(YarnUtils.getHadoopVersion().compareTo(YarnUtils.HadoopVersions.HADOOP_21) >= 0); + // Don't run this test in Mac, as there would be leftover java process (HADOOP-12317) + // The test can be force to run by turning on the "force-mac-tests" maven profile + // After the test finished, run the `jps` command and delete all `TwillLauncher` processes + Assume.assumeTrue(Boolean.parseBoolean(System.getProperty("force.mac.tests")) || + !System.getProperty("os.name").toLowerCase().contains("mac")); + + File watchFile = TEMP_FOLDER.newFile(); + watchFile.delete(); + + // Start the testing app, and wait for 4 log lines that match the pattern emitted by the event handler (AM) + // and from the runnable + final Semaphore semaphore = new Semaphore(0); + TwillRunner runner = getTwillRunner(); + TwillController controller = runner.prepare(new TestApp(new TestEventHandler(watchFile))) + .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) + // Use a log handler to match messages from AM and the Runnable to make sure the log collection get resumed + // correctly after the AM restarted + .addLogHandler(new LogHandler() { + @Override + public void onLog(LogEntry logEntry) { + String message = logEntry.getMessage(); + if (message.equals("Container for " + TestRunnable.class.getSimpleName() + " launched")) { + semaphore.release(); + } else if (message.equals("Running 0")) { + semaphore.release(); + } + } + }) + .start(); + + // Wait for the first attempt running + Assert.assertTrue(semaphore.tryAcquire(2, 2, TimeUnit.MINUTES)); + // Touch the watchFile so that the event handler will kill the AM + Files.touch(watchFile); + // Wait for the second attempt running + Assert.assertTrue(semaphore.tryAcquire(2, 2, TimeUnit.MINUTES)); + + controller.terminate().get(); + } + + /** + * A {@link EventHandler} for killing the first attempt of the application. + */ + public static final class TestEventHandler extends EventHandler { + + private static final Logger LOG = LoggerFactory.getLogger(TestEventHandler.class); + + private File watchFile; + + TestEventHandler(File watchFile) { + this.watchFile = watchFile; + } + + @Override + public void containerLaunched(String runnableName, int instanceId, String containerId) { + LOG.info("Container for {} launched", runnableName); + + if (containerId.contains("_01_")) { + final File watchFile = new File(context.getSpecification().getConfigs().get("watchFile")); + Thread t = new Thread() { + @Override + public void run() { + // Wait for the watch file to be available, then kill the process + while (!watchFile.exists()) { + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + Runtime.getRuntime().halt(-1); + } + }; + t.setDaemon(true); + t.start(); + } + } + + @Override + protected Map<String, String> getConfigs() { + return Collections.singletonMap("watchFile", watchFile.getAbsolutePath()); + } + } + + /** + * Application for testing + */ + public static final class TestApp implements TwillApplication { + + private final EventHandler eventHandler; + + public TestApp(EventHandler eventHandler) { + this.eventHandler = eventHandler; + } + + @Override + public TwillSpecification configure() { + return TwillSpecification.Builder.with() + .setName("TestApp") + .withRunnable() + .add(new TestRunnable()).noLocalFiles() + .anyOrder() + .withEventHandler(eventHandler).build(); + } + } + + /** + * Runnable for testing + */ + public static final class TestRunnable extends AbstractTwillRunnable { + + private static final Logger LOG = LoggerFactory.getLogger(TestRunnable.class); + + private final CountDownLatch stopLatch = new CountDownLatch(1); + + @Override + public void run() { + long count = 0; + try { + while (!stopLatch.await(2, TimeUnit.SECONDS)) { + LOG.info("Running {}", count++); + } + } catch (InterruptedException e) { + LOG.info("Interrupted", e); + } + } + + @Override + public void stop() { + stopLatch.countDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/twill/blob/107dc1e2/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java ---------------------------------------------------------------------- diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java index 0e2239d..bce6391 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java @@ -25,16 +25,21 @@ import com.google.common.util.concurrent.SettableFuture; import org.apache.twill.common.Cancellable; import org.apache.twill.common.Threads; import org.apache.twill.internal.zookeeper.SettableOperationFuture; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; /** * Collection of helper methods for common operations that usually needed when interacting with ZooKeeper. @@ -282,6 +287,73 @@ public final class ZKOperations { } /** + * Creates a ZK node of the given path. If the node already exists, deletion of the node (recursively) will happen + * and the creation will be retried. + */ + public static OperationFuture<String> createDeleteIfExists(final ZKClient zkClient, final String path, + @Nullable final byte[] data, final CreateMode createMode, + final boolean createParent, final ACL...acls) { + final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(path, + Threads.SAME_THREAD_EXECUTOR); + final List<ACL> createACLs = acls.length == 0 ? ZooDefs.Ids.OPEN_ACL_UNSAFE : Arrays.asList(acls); + createNode(zkClient, path, data, createMode, createParent, createACLs, new FutureCallback<String>() { + + final FutureCallback<String> createCallback = this; + + @Override + public void onSuccess(String result) { + // Create succeeded, just set the result to the resultFuture + resultFuture.set(result); + } + + @Override + public void onFailure(final Throwable createFailure) { + // If create failed not because of the NodeExistsException, just set the exception to the result future + if (!(createFailure instanceof KeeperException.NodeExistsException)) { + resultFuture.setException(createFailure); + return; + } + + // Try to delete the path + LOG.info("Node {}{} already exists. Deleting it and retry creation", zkClient.getConnectString(), path); + Futures.addCallback(recursiveDelete(zkClient, path), new FutureCallback<String>() { + @Override + public void onSuccess(String result) { + // If delete succeeded, perform the creation again. + createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback); + } + + @Override + public void onFailure(Throwable t) { + // If deletion failed because of NoNodeException, fail the result operation future + if (!(t instanceof KeeperException.NoNodeException)) { + createFailure.addSuppressed(t); + resultFuture.setException(createFailure); + return; + } + + // If can't delete because the node no longer exists, just go ahead and recreate the node + createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback); + } + }, Threads.SAME_THREAD_EXECUTOR); + } + }); + + return resultFuture; + } + + /** + * Private helper method to create a ZK node based on the parameter. The result of the creation is always + * communicate via the provided {@link FutureCallback}. + */ + private static void createNode(ZKClient zkClient, String path, @Nullable byte[] data, + CreateMode createMode, boolean createParent, + Iterable<ACL> acls, FutureCallback<String> callback) { + Futures.addCallback(zkClient.create(path, data, createMode, createParent, acls), + callback, Threads.SAME_THREAD_EXECUTOR); + } + + /** * Watch for the given path until it exists. * @param zkClient The {@link ZKClient} to use. * @param path A ZooKeeper path to watch for existent.
