Repository: samza Updated Branches: refs/heads/master 42b187a21 -> 991ed99aa
SAMZA-1726: Isolate InMemorySystemFactory to run separately per job Tested by running the corresponding integration and unit tests Author: sanil15 <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #532 from Sanil15/SAMZA-1726 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/991ed99a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/991ed99a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/991ed99a Branch: refs/heads/master Commit: 991ed99aac4ee56d82002afb030dcabd5e75e83c Parents: 42b187a Author: sanil15 <[email protected]> Authored: Fri Jun 22 12:44:57 2018 -0700 Committer: xiliu <[email protected]> Committed: Fri Jun 22 12:44:57 2018 -0700 ---------------------------------------------------------------------- .../samza/config/InMemorySystemConfig.java | 52 ++++++++++++++++++++ .../system/inmemory/InMemorySystemFactory.java | 15 ++++-- 2 files changed, 63 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/991ed99a/samza-core/src/main/java/org/apache/samza/config/InMemorySystemConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/InMemorySystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/InMemorySystemConfig.java new file mode 100644 index 0000000..b46ac41 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/InMemorySystemConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +/** + * A convenience class for fetching configs related to the {@link org.apache.samza.system.inmemory.InMemorySystemFactory} + */ +public class InMemorySystemConfig extends MapConfig { + /** + * <p>This Config determines Runtime behaviour of {@link org.apache.samza.system.inmemory.InMemorySystemFactory} </p> + * <p> + * If {@code INMEMORY_SCOPE} key is configured with a non null value in the configs, it creates an isolated + * InMemorySystem identified by the value of {@code INMEMORY_SCOPE} in {@link org.apache.samza.system.inmemory.InMemorySystemFactory} + * for the app while runtime. All the in memory streams (input/output/intermediate) are created using this isolated + * InMemorySystem. + * </p> + * <p> + * If {@code INMEMORY_SCOPE} key is not configured or is null for an app, it shares a default InMemorySystem + * identified by {@code DEFAULT_INMEMORY_SCOPE} value of {@code INMEMORY_SCOPE} + * This system is shared between all the applications missing {@code INMEMORY_SCOPE} key in their configs running + * in the same JVM using {@link org.apache.samza.system.inmemory.InMemorySystemFactory} + * </p> + */ + public static final String INMEMORY_SCOPE = "inmemory.scope"; + + public static final String DEFAULT_INMEMORY_SCOPE = "SAME_DEFAULT_SCOPE"; + + public InMemorySystemConfig(Config config) { + super(config); + } + + public String getInMemoryScope() { + return this.get(INMEMORY_SCOPE) == null ? DEFAULT_INMEMORY_SCOPE : this.get(INMEMORY_SCOPE); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/991ed99a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemFactory.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemFactory.java index f78b7f4..d534ca9 100644 --- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemFactory.java +++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemFactory.java @@ -19,7 +19,9 @@ package org.apache.samza.system.inmemory; +import java.util.concurrent.ConcurrentHashMap; import org.apache.samza.config.Config; +import org.apache.samza.config.InMemorySystemConfig; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemConsumer; @@ -31,20 +33,25 @@ import org.apache.samza.system.SystemProducer; * Initial draft of in-memory {@link SystemFactory}. It is test only and not meant for production use right now. */ public class InMemorySystemFactory implements SystemFactory { - private static final InMemoryManager MEMORY_MANAGER = new InMemoryManager(); + private static final ConcurrentHashMap<String, InMemoryManager> IN_MEMORY_MANAGERS = new ConcurrentHashMap<>(); @Override public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { - return new InMemorySystemConsumer(MEMORY_MANAGER); + return new InMemorySystemConsumer(getOrDefaultInMemoryManagerByTestId(config)); } @Override public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { - return new InMemorySystemProducer(systemName, MEMORY_MANAGER); + return new InMemorySystemProducer(systemName, getOrDefaultInMemoryManagerByTestId(config)); } @Override public SystemAdmin getAdmin(String systemName, Config config) { - return new InMemorySystemAdmin(MEMORY_MANAGER); + return new InMemorySystemAdmin(getOrDefaultInMemoryManagerByTestId(config)); + } + + private InMemoryManager getOrDefaultInMemoryManagerByTestId(Config config) { + InMemorySystemConfig inMemorySystemConfig = new InMemorySystemConfig(config); + return IN_MEMORY_MANAGERS.computeIfAbsent(inMemorySystemConfig.getInMemoryScope(), key -> new InMemoryManager()); } }
