SAMZA-1165; cleanup old zk versions. Author: Boris Shkolnik <[email protected]> Author: Boris Shkolnik <[email protected]>
Reviewers: Navina <[email protected]>, Shanthoosh V<[email protected]> Closes #239 from sborya/zkCleanUpBarrier1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4eb51531 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4eb51531 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4eb51531 Branch: refs/heads/0.14.0 Commit: 4eb51531387f018ea4350424edc45516ac3aea46 Parents: ce77716 Author: Boris Shkolnik <[email protected]> Authored: Fri Jul 21 13:43:18 2017 -0700 Committer: Jagadish <[email protected]> Committed: Fri Jul 21 13:43:18 2017 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/TaskConfigJava.java | 2 +- .../samza/zk/ScheduleAfterDebounceTime.java | 8 ++ .../samza/zk/ZkBarrierForVersionUpgrade.java | 7 +- .../org/apache/samza/zk/ZkJobCoordinator.java | 3 + .../main/java/org/apache/samza/zk/ZkUtils.java | 62 ++++++++++ .../java/org/apache/samza/zk/TestZkUtils.java | 112 ++++++++++++++++++- .../samza/processor/TestZkStreamProcessor.java | 9 +- .../processor/TestZkStreamProcessorBase.java | 2 +- .../processor/TestZkStreamProcessorSession.java | 3 + .../processor/TestZkLocalApplicationRunner.java | 3 + 10 files changed, 199 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java index fc9f165..0bf078e 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java @@ -36,7 +36,7 @@ import scala.collection.JavaConverters; public class TaskConfigJava extends MapConfig { // Task Configs - private static final String TASK_SHUTDOWN_MS = "task.shutdown.ms"; + public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms"; public static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L; // broadcast streams consumed by all tasks. e.g. kafka.foo#1 http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java index 9b8ea66..6174063 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java @@ -48,6 +48,14 @@ public class ScheduleAfterDebounceTime { // Action name when the Processor membership changes public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange"; + /** + * + * cleanup process is started after every new job model generation is complete. + * It deletes old versions of job model and the barrier. + * How many to delete (or to leave) is controlled by @see org.apache.samza.zk.ZkJobCoordinator#NUM_VERSIONS_TO_LEAVE. + **/ + public static final String ON_ZK_CLEANUP = "OnCleanUp"; + private final ScheduledTaskFailureCallback scheduledTaskFailureCallback; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java index 196e431..3257ee1 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java @@ -117,9 +117,7 @@ public class ZkBarrierForVersionUpgrade { * @param version Version associated with the Barrier */ public void expire(String version) { - zkUtils.writeData( - keyBuilder.getBarrierStatePath(version), - State.TIMED_OUT); + zkUtils.writeData(keyBuilder.getBarrierStatePath(version), State.TIMED_OUT); } /** @@ -222,4 +220,7 @@ public class ZkBarrierForVersionUpgrade { } } + public static int getVersion(String barrierPath) { + return Integer.valueOf(barrierPath.substring(barrierPath.lastIndexOf('_') + 1)); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 298c96e..dd08e3f 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -60,6 +60,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { // TODO: MetadataCache timeout has to be 0 for the leader so that it can always have the latest information associated // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197 private static final int METADATA_CACHE_TTL_MS = 5000; + private static final int NUM_VERSIONS_TO_LEAVE = 10; private final ZkUtils zkUtils; private final String processorId; @@ -202,6 +203,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion); LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion); + + debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_ZK_CLEANUP, 0, () -> zkUtils.cleanupZK(NUM_VERSIONS_TO_LEAVE)); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 8b6bc52..5df7114 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -22,6 +22,7 @@ package org.apache.samza.zk; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.TreeSet; @@ -32,6 +33,7 @@ import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkInterruptedException; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.MetricsRegistry; @@ -459,6 +461,66 @@ public class ZkUtils { } /** + * cleanup old data from ZK + * @param numVersionsToLeave - number of versions to leave + */ + public void cleanupZK(int numVersionsToLeave) { + deleteOldBarrierVersions(numVersionsToLeave); + deleteOldJobModels(numVersionsToLeave); + } + + void deleteOldJobModels(int numVersionsToLeave) { + // read current list of JMs + String path = keyBuilder.getJobModelPathPrefix(); + LOG.info("about to delete jm path=" + path); + List<String> znodeIds = zkClient.getChildren(path); + deleteOldVersionPath(path, znodeIds, numVersionsToLeave, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + // jm version name format is <num> + return Integer.valueOf(o1) - Integer.valueOf(o2); + } + }); + } + + void deleteOldBarrierVersions(int numVersionsToLeave) { + // read current list of barriers + String path = keyBuilder.getJobModelVersionBarrierPrefix(); + LOG.info("about to delete old barrier paths from " + path); + List<String> znodeIds = zkClient.getChildren(path); + LOG.info("List of all zkNodes: " + znodeIds); + deleteOldVersionPath(path, znodeIds, numVersionsToLeave, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + // barrier's name format is barrier_<num> + return ZkBarrierForVersionUpgrade.getVersion(o1) - ZkBarrierForVersionUpgrade.getVersion(o2); + } + }); + } + + void deleteOldVersionPath(String path, List<String> zNodeIds, int numVersionsToLeave, Comparator<String> c) { + if (StringUtils.isEmpty(path) || zNodeIds == null) { + LOG.warn("cannot cleanup empty path or empty list in ZK"); + return; + } + if (zNodeIds.size() > numVersionsToLeave) { + Collections.sort(zNodeIds, c); + // get the znodes to delete + int size = zNodeIds.size(); + List<String> zNodesToDelete = zNodeIds.subList(0, zNodeIds.size() - numVersionsToLeave); + LOG.info("Starting cleanup of barrier version zkNodes. From size=" + size + " to size " + zNodesToDelete.size() + "; numberToLeave=" + numVersionsToLeave); + for (String znodeId : zNodesToDelete) { + String pathToDelete = path + "/" + znodeId; + try { + LOG.info("deleting " + pathToDelete); + zkClient.deleteRecursive(pathToDelete); + } catch (Exception e) { + LOG.warn("delete of node " + pathToDelete + " failed.", e); + } + } + } + } + /** * Represents zookeeper processor node. */ private static class ProcessorNode { http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index e7a9aa2..b5953d1 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -18,6 +18,10 @@ */ package org.apache.samza.zk; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,8 +42,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; -import org.junit.rules.ExpectedException; import org.junit.Test; +import org.junit.rules.ExpectedException; public class TestZkUtils { private static EmbeddedZookeeper zkServer = null; @@ -102,7 +106,6 @@ public class TestZkUtils { // Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid Assert.assertTrue(zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1")).equals(assignedPath)); - } @Test @@ -237,6 +240,111 @@ public class TestZkUtils { Assert.assertEquals(jobModel, zkUtils.getJobModel(version)); } + @Test + public void testCleanUpZkJobModels() { + String root = zkUtils.getKeyBuilder().getJobModelPathPrefix(); + System.out.println("root=" + root); + zkUtils.getZkClient().createPersistent(root, true); + + // generate multiple version + for (int i = 101; i < 110; i++) { + zkUtils.publishJobModel(String.valueOf(i), null); + } + + // clean all of the versions except 5 most recent ones + zkUtils.deleteOldJobModels(5); + Assert.assertEquals(Arrays.asList("105", "106", "107", "108", "109"), zkUtils.getZkClient().getChildren(root)); + } + + @Test + public void testCleanUpZkBarrierVersion() { + String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(); + zkUtils.getZkClient().createPersistent(root, true); + ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null); + for (int i = 200; i < 210; i++) { + barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", i + "b", i + "c"))); + } + + zkUtils.deleteOldBarrierVersions(5); + List<String> zNodeIds = zkUtils.getZkClient().getChildren(root); + Collections.sort(zNodeIds); + Assert.assertEquals(Arrays.asList("barrier_205", "barrier_206", "barrier_207", "barrier_208", "barrier_209"), + zNodeIds); + } + + @Test + public void testCleanUpZk() { + String pathA = "/path/testA"; + String pathB = "/path/testB"; + zkUtils.getZkClient().createPersistent(pathA, true); + zkUtils.getZkClient().createPersistent(pathB, true); + + // Create 100 nodes + for (int i = 0; i < 20; i++) { + String p1 = pathA + "/" + i; + zkUtils.getZkClient().createPersistent(p1, true); + zkUtils.getZkClient().createPersistent(p1 + "/something1", true); + zkUtils.getZkClient().createPersistent(p1 + "/something2", true); + + String p2 = pathB + "/some_" + i; + zkUtils.getZkClient().createPersistent(p2, true); + zkUtils.getZkClient().createPersistent(p2 + "/something1", true); + zkUtils.getZkClient().createPersistent(p2 + "/something2", true); + } + + List<String> zNodeIds = new ArrayList<>(); + // empty list + zkUtils.deleteOldVersionPath(pathA, zNodeIds, 10, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + return o1.compareTo(o2); + } + }); + + + zNodeIds = zkUtils.getZkClient().getChildren(pathA); + zkUtils.deleteOldVersionPath(pathA, zNodeIds, 10, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + return Integer.valueOf(o1) - Integer.valueOf(o2); + } + }); + + for (int i = 0; i < 10; i++) { + // should be gone + String p1 = pathA + "/" + i; + Assert.assertFalse("path " + p1 + " exists", zkUtils.getZkClient().exists(p1)); + } + + for (int i = 10; i < 20; i++) { + // should be gone + String p1 = pathA + "/" + i; + Assert.assertTrue("path " + p1 + " exists", zkUtils.getZkClient().exists(p1)); + } + + zNodeIds = zkUtils.getZkClient().getChildren(pathB); + zkUtils.deleteOldVersionPath(pathB, zNodeIds, 1, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + return Integer.valueOf(o1.substring(o1.lastIndexOf("_") + 1)) - Integer + .valueOf(o2.substring(o2.lastIndexOf("_") + 1)); + } + }); + + for (int i = 0; i < 19; i++) { + // should be gone + String p1 = pathB + "/" + i; + Assert.assertFalse("path " + p1 + " exists", zkUtils.getZkClient().exists(p1)); + } + + for (int i = 19; i < 20; i++) { + // should be gone + String p1 = pathB + "/some_" + i; + Assert.assertTrue("path " + p1 + " exists", zkUtils.getZkClient().exists(p1)); + } + + } + public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) { long delay = startDelayMs; while (delay < maxDelayMs) { http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java index 1a13825..7253b29 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java @@ -20,7 +20,6 @@ package org.apache.samza.processor; import java.util.concurrent.CountDownLatch; -import org.apache.samza.zk.TestZkUtils; import org.junit.Assert; import org.junit.Test; @@ -146,8 +145,8 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase { LOG.info("containerStopped latch = " + containerStopped1); waitForProcessorToStartStop(containerStopped1); - // let the system to publish and distribute the new job model - TestZkUtils.sleepMs(600); + // read again the first batch + waitUntilMessagesLeftN(totalEventsToGenerate - 2 * messageCount); // produce the second batch of the messages, starting with 'messageCount' produceMessages(messageCount, inputTopic, messageCount); @@ -226,8 +225,8 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase { LOG.info("containerStopped latch = " + containerStopped2); waitForProcessorToStartStop(containerStopped2); - // let the system to publish and distribute the new job model - TestZkUtils.sleepMs(300); + // read again the first batch + waitUntilMessagesLeftN(totalEventsToGenerate - 2 * messageCount); // produce the second batch of the messages, starting with 'messageCount' produceMessages(messageCount, inputTopic, messageCount); http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java index 4cbe252..f2f1585 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java @@ -287,7 +287,7 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness System.out.println("2read all. current count = " + leftEventsCount); break; } - TestZkUtils.sleepMs(3000); + TestZkUtils.sleepMs(5000); attempts--; } Assert.assertTrue("Didn't read all the leftover events in " + ATTEMPTS_NUMBER + " attempts", attempts > 0); http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java index 10b08d9..6aab5e3 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java @@ -112,6 +112,9 @@ public class TestZkStreamProcessorSession extends TestZkStreamProcessorBase { waitForProcessorToStartStop(containerStopLatches[i]); } + // read again the first batch + waitUntilMessagesLeftN(totalEventsToGenerate - 2 * messageCount); + produceMessages(messageCount, inputTopic, messageCount); waitUntilMessagesLeftN(0); http://git-wip-us.apache.org/repos/asf/samza/blob/4eb51531/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 77e2a49..ebbe07b 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -44,6 +44,7 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; +import org.apache.samza.config.TaskConfigJava; import org.apache.samza.config.ZkConfig; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.job.model.JobModel; @@ -83,6 +84,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory"; private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory"; private static final String TEST_JOB_NAME = "test-job"; + private static final String TASK_SHUTDOWN_MS = "5000"; private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"}; private String inputKafkaTopic; @@ -169,6 +171,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne .put(ApplicationConfig.APP_ID, appId) .put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY) .put(JobConfig.JOB_NAME(), TEST_JOB_NAME) + .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS) .build(); Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig); applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
