Repository: samza Updated Branches: refs/heads/master 19c6f4f61 -> 160927ada
SAMZA-1859: Zookeeper implementation of MetadataStore. Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Prateek Maheshwari <[email protected]>, Daniel Nishumura <[email protected]> Closes #629 from shanthoosh/metadata_store_zk_impl Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/160927ad Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/160927ad Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/160927ad Branch: refs/heads/master Commit: 160927adad9843cf5110b341cb0b67413f75249d Parents: 19c6f4f Author: Shanthoosh Venkataraman <[email protected]> Authored: Wed Sep 19 12:16:15 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Sep 19 12:16:15 2018 -0700 ---------------------------------------------------------------------- .../samza/metadatastore/MetadataStore.java | 6 +- .../apache/samza/container/LocalityManager.java | 2 +- .../grouper/task/TaskAssignmentManager.java | 2 +- .../metadatastore/CoordinatorStreamStore.java | 2 +- .../java/org/apache/samza/zk/ProcessorData.java | 19 +-- .../samza/zk/ZkJobCoordinatorFactory.java | 17 +-- .../java/org/apache/samza/zk/ZkKeyBuilder.java | 30 +++-- .../org/apache/samza/zk/ZkMetadataStore.java | 132 +++++++++++++++++++ .../apache/samza/zk/ZkMetadataStoreFactory.java | 36 +++++ .../apache/samza/container/SamzaContainer.scala | 26 ++-- .../TestCoordinatorStreamStore.java | 2 +- .../org/apache/samza/zk/TestZkKeyBuilder.java | 2 +- .../apache/samza/zk/TestZkMetadataStore.java | 121 +++++++++++++++++ 13 files changed, 340 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java index aaa420b..cd04794 100644 --- a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java +++ b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java @@ -19,8 +19,6 @@ package org.apache.samza.metadatastore; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.config.Config; -import org.apache.samza.metrics.MetricsRegistry; import java.util.Map; /** @@ -32,10 +30,8 @@ public interface MetadataStore { /** * Initializes the metadata store, if applicable, setting up the underlying resources * and connections to the store endpoints. - * - * @param config the configuration for instantiating the MetadataStore. */ - void init(Config config, MetricsRegistry metricsRegistry); + void init(); /** * Gets the value associated with the specified {@code key}. http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java index c70b15a..20e86d9 100644 --- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java @@ -78,7 +78,7 @@ public class LocalityManager { this.config = config; MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class); this.metadataStore = metadataStoreFactory.getMetadataStore(SetContainerHostMapping.TYPE, config, metricsRegistry); - this.metadataStore.init(config, metricsRegistry); + this.metadataStore.init(); this.keySerde = keySerde; this.valueSerde = valueSerde; this.taskAssignmentManager = new TaskAssignmentManager(config, metricsRegistry, keySerde, valueSerde); http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java index 42a6e81..2bfd4c3 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java @@ -84,7 +84,7 @@ public class TaskAssignmentManager { } public void init(Config config, MetricsRegistry metricsRegistry) { - this.metadataStore.init(config, metricsRegistry); + this.metadataStore.init(); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java index d74188b..af5e2f9 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java @@ -89,7 +89,7 @@ public class CoordinatorStreamStore implements MetadataStore { } @Override - public void init(Config config, MetricsRegistry metricsRegistry) { + public void init() { if (isInitialized.compareAndSet(false, true)) { LOG.info("Starting the coordinator stream system consumer with config: {}.", config); registerConsumer(); http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java index a48a450..91ba33d 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java @@ -21,17 +21,18 @@ package org.apache.samza.zk; import java.util.Objects; import org.apache.samza.SamzaException; +import org.apache.samza.runtime.LocationId; /** * Represents processor data stored in zookeeper processors node. */ public class ProcessorData { private final String processorId; - private final String host; + private final String locationId; - public ProcessorData(String host, String processorId) { + public ProcessorData(String locationId, String processorId) { this.processorId = processorId; - this.host = host; + this.locationId = locationId; } public ProcessorData(String data) { @@ -39,16 +40,16 @@ public class ProcessorData { if (splt.length != 2) { throw new SamzaException("incorrect processor data format = " + data); } - host = splt[0]; + locationId = splt[0]; processorId = splt[1]; } public String toString() { - return host + " " + processorId; + return locationId + " " + processorId; } - public String getHost() { - return host; + public LocationId getLocationId() { + return new LocationId(locationId); } public String getProcessorId() { @@ -57,7 +58,7 @@ public class ProcessorData { @Override public int hashCode() { - return Objects.hash(processorId, host); + return Objects.hash(processorId, locationId); } @Override @@ -65,6 +66,6 @@ public class ProcessorData { if (obj == null) return false; if (getClass() != obj.getClass()) return false; final ProcessorData other = (ProcessorData) obj; - return Objects.equals(processorId, other.processorId) && Objects.equals(host, other.host); + return Objects.equals(processorId, other.processorId) && Objects.equals(locationId, other.locationId); } } http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index 6888df0..3dad6c1 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -31,7 +31,6 @@ import org.apache.samza.metrics.MetricsRegistryMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class); @@ -40,22 +39,24 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { private static final String DEFAULT_JOB_NAME = "defaultJob"; /** - * Method to instantiate an implementation of JobCoordinator + * Instantiates an {@link ZkJobCoordinator} using the {@link Config}. * - * @param config - configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig" - * @return An instance of IJobCoordinator + * @param config zookeeper configurations required for instantiating {@link ZkJobCoordinator} + * @return An instance of {@link ZkJobCoordinator} */ @Override public JobCoordinator getJobCoordinator(Config config) { + // TODO: Separate JC related configs into a "ZkJobCoordinatorConfig" MetricsRegistry metricsRegistry = new MetricsRegistryMap(); - ZkUtils zkUtils = getZkUtils(config, metricsRegistry); - LOG.debug("Creating ZkJobCoordinator instance with config: {}.", config); + String jobCoordinatorZkBasePath = getJobCoordinationZkPath(config); + ZkUtils zkUtils = getZkUtils(config, metricsRegistry, jobCoordinatorZkBasePath); + LOG.debug("Creating ZkJobCoordinator with config: {}.", config); return new ZkJobCoordinator(config, metricsRegistry, zkUtils); } - private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) { + private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry, String coordinatorZkBasePath) { ZkConfig zkConfig = new ZkConfig(config); - ZkKeyBuilder keyBuilder = new ZkKeyBuilder(getJobCoordinationZkPath(config)); + ZkKeyBuilder keyBuilder = new ZkKeyBuilder(coordinatorZkBasePath); ZkClient zkClient = ZkCoordinationUtilsFactory .createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), zkConfig.getZkSessionTimeoutMs(), metricsRegistry); http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java index 37bff6d..16efe81 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java @@ -42,16 +42,18 @@ import com.google.common.base.Strings; * This class provides helper methods to easily generate/parse the path in the ZK hierarchy. */ public class ZkKeyBuilder { + + static final String PROCESSORS_PATH = "processors"; + static final String JOBMODEL_GENERATION_PATH = "jobModelGeneration"; + static final String JOB_MODEL_UPGRADE_BARRIER_PATH = "jobModelUpgradeBarrier"; + private static final String TASK_LOCALITY_PATH = "taskLocality"; + /** * Prefix generated to uniquely identify a particular deployment of a job. * TODO: For now, it looks like $jobName-$jobId. We need to add a unique deployment/attempt identifier as well. */ private final String pathPrefix; - static final String PROCESSORS_PATH = "processors"; - static final String JOBMODEL_GENERATION_PATH = "JobModelGeneration"; - static final String JOB_MODEL_UPGRADE_BARRIER = "jobModelUpgradeBarrier"; - public ZkKeyBuilder(String pathPrefix) { if (pathPrefix != null && !pathPrefix.trim().isEmpty()) { this.pathPrefix = pathPrefix.trim(); @@ -60,11 +62,11 @@ public class ZkKeyBuilder { } } - public String getRootPath() { + String getRootPath() { return "/" + pathPrefix; } - public String getProcessorsPath() { + String getProcessorsPath() { return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH); } @@ -77,25 +79,29 @@ public class ZkKeyBuilder { * @param path Full ZK path of a registered processor * @return String representing the processor ID */ - public static String parseIdFromPath(String path) { + static String parseIdFromPath(String path) { if (!Strings.isNullOrEmpty(path)) return path.substring(path.lastIndexOf("/") + 1); return null; } - public String getJobModelVersionPath() { + String getJobModelVersionPath() { return String.format("%s/%s/jobModelVersion", getRootPath(), JOBMODEL_GENERATION_PATH); } - public String getJobModelPathPrefix() { + String getJobModelPathPrefix() { return String.format("%s/%s/jobModels", getRootPath(), JOBMODEL_GENERATION_PATH); } - public String getJobModelPath(String jobModelVersion) { + String getJobModelPath(String jobModelVersion) { return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion); } - public String getJobModelVersionBarrierPrefix() { - return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, JOB_MODEL_UPGRADE_BARRIER); + String getJobModelVersionBarrierPrefix() { + return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, JOB_MODEL_UPGRADE_BARRIER_PATH); + } + + String getTaskLocalityPath() { + return String.format("%s/%s", getRootPath(), TASK_LOCALITY_PATH); } } http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java new file mode 100644 index 0000000..4cfdc8d --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStore.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.nio.charset.Charset; +import java.util.concurrent.TimeUnit; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; +import org.apache.samza.config.Config; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metadatastore.MetadataStore; +import org.apache.samza.SamzaException; +import org.I0Itec.zkclient.ZkClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of the {@link MetadataStore} interface where the + * metadata of the Samza job is stored in zookeeper. + */ +public class ZkMetadataStore implements MetadataStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZkMetadataStore.class); + + private final ZkClient zkClient; + private final ZkConfig zkConfig; + private final String zkBaseDir; + + public ZkMetadataStore(String zkBaseDir, Config config, MetricsRegistry metricsRegistry) { + this.zkConfig = new ZkConfig(config); + this.zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs(), new BytesPushThroughSerializer()); + this.zkBaseDir = zkBaseDir; + zkClient.createPersistent(zkBaseDir, true); + } + + /** + * {@inheritDoc} + */ + @Override + public void init() { + zkClient.waitUntilConnected(zkConfig.getZkConnectionTimeoutMs(), TimeUnit.MILLISECONDS); + } + + /** + * {@inheritDoc} + */ + @Override + public byte[] get(byte[] key) { + return zkClient.readData(getZkPathForKey(key), true); + } + + /** + * {@inheritDoc} + */ + @Override + public void put(byte[] key, byte[] value) { + String zkPath = getZkPathForKey(key); + zkClient.createPersistent(zkPath, true); + zkClient.writeData(zkPath, value); + } + + /** + * {@inheritDoc} + */ + @Override + public void delete(byte[] key) { + zkClient.delete(getZkPathForKey(key)); + } + + /** + * {@inheritDoc} + * @throws SamzaException if there're exceptions reading data from zookeeper. + */ + @Override + public Map<byte[], byte[]> all() { + try { + List<String> zkSubDirectories = zkClient.getChildren(zkBaseDir); + Map<byte[], byte[]> result = new HashMap<>(); + for (String zkSubDir : zkSubDirectories) { + String completeZkPath = String.format("%s/%s", zkBaseDir, zkSubDir); + byte[] value = zkClient.readData(completeZkPath, true); + if (value != null) { + result.put(completeZkPath.getBytes("UTF-8"), value); + } + } + return result; + } catch (Exception e) { + String errorMsg = String.format("Error reading path: %s from zookeeper.", zkBaseDir); + LOG.error(errorMsg, e); + throw new SamzaException(errorMsg, e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void flush() { + // No-op for zookeeper implementation. + } + + /** + * {@inheritDoc} + */ + @Override + public void close() { + zkClient.close(); + } + + private String getZkPathForKey(byte[] key) { + return String.format("%s/%s", zkBaseDir, new String(key, Charset.forName("UTF-8"))); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java new file mode 100644 index 0000000..a9c979d --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import org.apache.samza.config.Config; +import org.apache.samza.metadatastore.MetadataStore; +import org.apache.samza.metadatastore.MetadataStoreFactory; +import org.apache.samza.metrics.MetricsRegistry; + +/** + * Builds the {@link ZkMetadataStore} based upon the provided {@link Config} + * and {@link MetricsRegistry}. + */ +public class ZkMetadataStoreFactory implements MetadataStoreFactory { + + @Override + public MetadataStore getMetadataStore(String namespace, Config config, MetricsRegistry metricsRegistry) { + return new ZkMetadataStore(namespace, config, metricsRegistry); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 68de4a6..7b64f5e 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -129,12 +129,6 @@ object SamzaContainer extends Logging { val containerName = "samza-container-%s" format containerId val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions - var localityManager: LocalityManager = null - if (new ClusterManagerConfig(config).getHostAffinityEnabled()) { - val registryMap = new MetricsRegistryMap(containerName) - localityManager = new LocalityManager(config, registryMap) - } - val containerPID = ManagementFactory.getRuntimeMXBean().getName() info("Setting up Samza container: %s" format containerName) @@ -719,7 +713,6 @@ object SamzaContainer extends Logging { consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, offsetManager = offsetManager, - localityManager = localityManager, securityManager = securityManager, metrics = samzaContainerMetrics, reporters = reporters, @@ -799,7 +792,7 @@ class SamzaContainer( startDiagnostics startAdmins startOffsetManager - startLocalityManager + storeContainerLocality startStores startTableManager startDiskSpaceMonitor @@ -841,7 +834,6 @@ class SamzaContainer( shutdownDiskSpaceMonitor shutdownHostStatisticsMonitor shutdownProducers - shutdownLocalityManager shutdownOffsetManager shutdownMetrics shutdownSecurityManger @@ -961,8 +953,10 @@ class SamzaContainer( offsetManager.start } - def startLocalityManager { - if(localityManager != null) { + def storeContainerLocality { + val isHostAffinityEnabled: Boolean = new ClusterManagerConfig(containerContext.config).getHostAffinityEnabled + if (isHostAffinityEnabled) { + val localityManager: LocalityManager = new LocalityManager(containerContext.config, containerContext.metricsRegistry) val containerName = "SamzaContainer-" + String.valueOf(containerContext.id) info("Registering %s with metadata store" format containerName) try { @@ -978,6 +972,9 @@ class SamzaContainer( case unknownException: Throwable => warn("Received an exception when persisting locality info for container %s: " + "%s" format (containerContext.id, unknownException.getMessage)) + } finally { + info("Shutting down locality manager.") + localityManager.close() } } } @@ -1145,13 +1142,6 @@ class SamzaContainer( taskInstances.values.foreach(_.shutdownTableManager) } - def shutdownLocalityManager { - if(localityManager != null) { - info("Shutting down locality manager.") - localityManager.close() - } - } - def shutdownOffsetManager { info("Shutting down offset manager.") http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java index da2d984..0e48363 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java @@ -57,7 +57,7 @@ public class TestCoordinatorStreamStore { when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new SystemStream("test-kafka", "test")); when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(), anyObject())).thenReturn("test"); coordinatorStreamStore = new CoordinatorStreamStore(SetTaskContainerMapping.TYPE, new MapConfig(configMap), new MetricsRegistryMap()); - coordinatorStreamStore.init(new MapConfig(), new MetricsRegistryMap()); + coordinatorStreamStore.init(); } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java index 8ddd688..d2175b2 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java @@ -56,6 +56,6 @@ public class TestZkKeyBuilder { Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels", builder.getJobModelPathPrefix()); String version = "2"; Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels/" + version, builder.getJobModelPath(version)); - Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/" + ZkKeyBuilder.JOB_MODEL_UPGRADE_BARRIER + "/versionBarriers", builder.getJobModelVersionBarrierPrefix()); + Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/" + ZkKeyBuilder.JOB_MODEL_UPGRADE_BARRIER_PATH + "/versionBarriers", builder.getJobModelVersionBarrierPrefix()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/160927ad/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java new file mode 100644 index 0000000..5930c65 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.metadatastore.MetadataStore; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.testUtils.EmbeddedZookeeper; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestZkMetadataStore { + + private static final String LOCALHOST = "127.0.0.1"; + + private static EmbeddedZookeeper zkServer; + + private MetadataStore zkMetadataStore; + + @BeforeClass + public static void beforeClass() { + zkServer = new EmbeddedZookeeper(); + zkServer.setup(); + } + + @AfterClass + public static void afterClass() { + zkServer.teardown(); + } + + @Before + public void beforeTest() { + String testZkConnectionString = String.format("%s:%s", LOCALHOST, zkServer.getPort()); + Config zkConfig = new MapConfig(ImmutableMap.of(ZkConfig.ZK_CONNECT, testZkConnectionString)); + zkMetadataStore = new ZkMetadataStoreFactory().getMetadataStore(String.format("/%s", RandomStringUtils.randomAlphabetic(5)), zkConfig, new MetricsRegistryMap()); + } + + @After + public void afterTest() { + zkMetadataStore.close(); + } + + @Test + public void testReadAfterWrite() throws Exception { + byte[] key = "test-key1".getBytes("UTF-8"); + byte[] value = "test-value1".getBytes("UTF-8"); + Assert.assertNull(zkMetadataStore.get(key)); + zkMetadataStore.put(key, value); + Assert.assertTrue(Arrays.equals(value, zkMetadataStore.get(key))); + Assert.assertEquals(1, zkMetadataStore.all().size()); + } + + @Test + public void testReadAfterDelete() throws Exception { + byte[] key = "test-key1".getBytes("UTF-8"); + byte[] value = "test-value1".getBytes("UTF-8"); + Assert.assertNull(zkMetadataStore.get(key)); + zkMetadataStore.put(key, value); + Assert.assertTrue(Arrays.equals(value, zkMetadataStore.get(key))); + zkMetadataStore.delete(key); + Assert.assertNull(zkMetadataStore.get(key)); + Assert.assertEquals(0, zkMetadataStore.all().size()); + } + + @Test + public void testReadOfNonExistentKey() throws Exception { + Assert.assertNull(zkMetadataStore.get("randomKey".getBytes("UTF-8"))); + Assert.assertEquals(0, zkMetadataStore.all().size()); + } + + @Test + public void testMultipleUpdatesForSameKey() throws Exception { + byte[] key = "test-key1".getBytes("UTF-8"); + byte[] value = "test-value1".getBytes("UTF-8"); + byte[] value1 = "test-value2".getBytes("UTF-8"); + zkMetadataStore.put(key, value); + zkMetadataStore.put(key, value1); + Assert.assertTrue(Arrays.equals(value1, zkMetadataStore.get(key))); + Assert.assertEquals(1, zkMetadataStore.all().size()); + } + + @Test + public void testAllEntries() throws Exception { + byte[] key = "test-key1".getBytes("UTF-8"); + byte[] key1 = "test-key2".getBytes("UTF-8"); + byte[] key2 = "test-key3".getBytes("UTF-8"); + byte[] value = "test-value1".getBytes("UTF-8"); + byte[] value1 = "test-value2".getBytes("UTF-8"); + byte[] value2 = "test-value3".getBytes("UTF-8"); + zkMetadataStore.put(key, value); + zkMetadataStore.put(key1, value1); + zkMetadataStore.put(key2, value2); + ImmutableMap<byte[], byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2); + Assert.assertEquals(expected.size(), zkMetadataStore.all().size()); + } +}
