Repository: samza Updated Branches: refs/heads/master 4d7b3b353 -> d104013ef
SAMZA-1107:Job model publish add utils for publishing job model and job model version to ZK. Author: Boris Shkolnik <[email protected]> Author: Boris Shkolnik <[email protected]> Author: navina <[email protected]> Reviewers: Navina Ramesh <[email protected]>, Fred Ji <[email protected]> Closes #67 from sborya/JobModelPublish1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d104013e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d104013e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d104013e Branch: refs/heads/master Commit: d104013ef16ffd959916a52e9e3f6e67a6e486b3 Parents: 4d7b3b3 Author: Boris Shkolnik <[email protected]> Authored: Wed Mar 1 13:49:29 2017 -0800 Committer: navina <[email protected]> Committed: Wed Mar 1 13:49:29 2017 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/samza/zk/ZkUtils.java | 75 +++++++++++++++++++- .../org/apache/samza/zk/TestZkKeyBuilder.java | 12 ++++ .../java/org/apache/samza/zk/TestZkUtils.java | 46 ++++++++++-- 3 files changed, 128 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/d104013e/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 320cd49..73376b1 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -19,6 +19,7 @@ package org.apache.samza.zk; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -27,6 +28,11 @@ import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkInterruptedException; +import org.apache.samza.SamzaException; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.apache.zookeeper.data.Stat; +import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -155,6 +161,44 @@ public class ZkUtils { } /** + * Publishes new job model into ZK. + * This call should FAIL if the node already exists. + * @param jobModelVersion version of the jobModeL to publish + * @param jobModel jobModel to publish + * + */ + public void publishJobModel(String jobModelVersion, JobModel jobModel) { + try { + ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper(); + String jobModelStr = mmapper.writerWithDefaultPrettyPrinter().writeValueAsString(jobModel); + LOG.info("pid=" + processorId + " jobModelAsString=" + jobModelStr); + zkClient.createPersistent(keyBuilder.getJobModelPath(jobModelVersion), jobModelStr); + LOG.info("wrote jobModel path =" + keyBuilder.getJobModelPath(jobModelVersion)); + } catch (Exception e) { + LOG.error("JobModel publish failed for version=" + jobModelVersion, e); + throw new SamzaException(e); + } + } + + /** + * get the job model from ZK by version + * @param jobModelVersion jobModel version to get + * @return job model for this version + */ + public JobModel getJobModel(String jobModelVersion) { + LOG.info("pid=" + processorId + "read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion)); + Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion)); + ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper(); + JobModel jm; + try { + jm = mmapper.readValue((String) data, JobModel.class); + } catch (IOException e) { + throw new SamzaException("failed to read JobModel from ZK", e); + } + return jm; + } + + /** * read the jobmodel version from ZK * @return jobmodel version as a string */ @@ -163,6 +207,36 @@ public class ZkUtils { } /** + * publish the version number of the next JobModel + * @param oldVersion - used to validate, that no one has changed the version in the meanwhile. + * @param newVersion - new version. + */ + public void publishJobModelVersion(String oldVersion, String newVersion) { + Stat stat = new Stat(); + String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat); + LOG.info("pid=" + processorId + " publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat + .getVersion() + ")"); + + if (currentVersion != null && !currentVersion.equals(oldVersion)) { + throw new SamzaException( + "Someone change JobModelVersion while the leader was generating one: expected" + oldVersion + ", got " + currentVersion); + } + // data version is the ZK version of the data from the ZK. + int dataVersion = stat.getVersion(); + try { + stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion); + } catch (Exception e) { + String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion; + LOG.error(msg, e); + throw new SamzaException(msg); + } + LOG.info("pid=" + processorId + + " published new version: " + newVersion + "; expected data version = " + (dataVersion + 1) + "(actual data version after update = " + stat.getVersion() + + ")"); + } + + + /** * verify that given paths exist in ZK * @param paths - paths to verify or create */ @@ -190,5 +264,4 @@ public class ZkUtils { zkClient.deleteRecursive(rootPath); } } - } http://git-wip-us.apache.org/repos/asf/samza/blob/d104013e/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java index 8e048b2..b56d279 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java @@ -50,4 +50,16 @@ public class TestZkKeyBuilder { Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null)); Assert.assertNull(ZkKeyBuilder.parseIdFromPath("")); } + + @Test + public void testJobModelPath() { + + ZkKeyBuilder builder = new ZkKeyBuilder("test"); + + Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_VERSION_PATH, builder.getJobModelVersionPath()); + Assert.assertEquals("/test/jobModels", builder.getJobModelPathPrefix()); + String version = "2"; + Assert.assertEquals("/test/jobModels/" + version, builder.getJobModelPath(version)); + Assert.assertEquals("/test/versionBarriers", builder.getJobModelVersionBarrierPrefix()); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/d104013e/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index b719e28..58c3ed6 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -18,11 +18,17 @@ */ package org.apache.samza.zk; +import java.util.HashMap; +import java.util.Map; import java.util.function.BooleanSupplier; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkNodeExistsException; +import org.apache.samza.SamzaException; +import org.apache.samza.config.MapConfig; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.junit.After; import org.junit.AfterClass; @@ -60,7 +66,6 @@ public class TestZkUtils { // Do nothing } - zkUtils = new ZkUtils( KEY_BUILDER, zkClient, @@ -96,11 +101,9 @@ public class TestZkUtils { public void testGetActiveProcessors() { Assert.assertEquals(0, zkUtils.getSortedActiveProcessors().size()); zkUtils.registerProcessorAndGetId("processorData"); - Assert.assertEquals(1, zkUtils.getSortedActiveProcessors().size()); - } - + @Test public void testSubscribeToJobModelVersionChange() { @@ -157,6 +160,41 @@ public class TestZkUtils { Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000)); } + @Test + public void testPublishNewJobModel() { + ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test"); + String root = keyBuilder.getRootPath(); + zkClient.deleteRecursive(root); + String version = "1"; + String oldVersion = "0"; + + zkUtils.makeSurePersistentPathsExists( + new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()}); + + zkUtils.publishJobModelVersion(oldVersion, version); + Assert.assertEquals(version, zkUtils.getJobModelVersion()); + + String newerVersion = Long.toString(Long.valueOf(version) + 1); + zkUtils.publishJobModelVersion(version, newerVersion); + Assert.assertEquals(newerVersion, zkUtils.getJobModelVersion()); + + try { + zkUtils.publishJobModelVersion(oldVersion, "10"); //invalid new version + Assert.fail("publish invalid version should've failed"); + } catch (SamzaException e) { + // expected + } + + // create job model + Map<String, String> configMap = new HashMap<>(); + Map<Integer, ContainerModel> containers = new HashMap<>(); + MapConfig config = new MapConfig(configMap); + JobModel jobModel = new JobModel(config, containers); + + zkUtils.publishJobModel(version, jobModel); + Assert.assertEquals(jobModel, zkUtils.getJobModel(version)); + } + public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) { long delay = startDelayMs; while (delay < maxDelayMs) {
