This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new bb3721a SAMZA-2276: Add Metdata store putAll API
bb3721a is described below
commit bb3721a3a572e74799a08d2391f4fbd61e3377eb
Author: Daniel Nishimura <[email protected]>
AuthorDate: Tue Jul 30 10:02:54 2019 -0700
SAMZA-2276: Add Metdata store putAll API
Add a putsAll API to the metadata store API. This allows for a delayed
flush() call for underlying stores that require a flush() after write such as
Kafka.
Author: Daniel Nishimura <[email protected]>
Reviewers: mynameborat <[email protected]>
Closes #1112 from dnishimura/samza-2276-metadata-store-putall-api and
squashes the following commits:
830b5994 [Daniel Nishimura] Update javadoc for MetadataStore#putAll
e52d884c [Daniel Nishimura] SAMZA-2271: Add Metdata store putAll API
---
.../apache/samza/metadatastore/MetadataStore.java | 13 +++++++++-
.../metadatastore/CoordinatorStreamStore.java | 28 +++++++++++++++-------
.../NamespaceAwareCoordinatorStreamStore.java | 9 +++++++
.../metadatastore/TestCoordinatorStreamStore.java | 27 ++++++++++++++++++++-
.../TestNamespaceAwareCoordinatorStreamStore.java | 23 ++++++++++++++++++
5 files changed, 90 insertions(+), 10 deletions(-)
diff --git
a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
index 9009a65..1358aa5 100644
--- a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
+++ b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
@@ -18,8 +18,8 @@
*/
package org.apache.samza.metadatastore;
-import org.apache.samza.annotation.InterfaceStability;
import java.util.Map;
+import org.apache.samza.annotation.InterfaceStability;
/**
* Store abstraction responsible for managing the metadata of a Samza job.
@@ -50,6 +50,17 @@ public interface MetadataStore {
void put(String key, byte[] value);
/**
+ * Updates the mapping with the specified map. This is not guaranteed to be
atomic.
+ *
+ * @param entries mapping of key to values to write to the metadata store
+ */
+ default void putAll(Map<String, byte[]> entries) {
+ for (Map.Entry<String, byte[]> entry : entries.entrySet()) {
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
* Deletes the mapping for the specified {@code key} from this metadata
store (if such mapping exists).
*
* @param key the key for which the mapping is to be deleted.
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index 4752cdc..e01a4c6 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -23,11 +23,11 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.Map;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
@@ -38,16 +38,16 @@ import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemProducer;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.system.SystemStreamPartitionIterator;
+import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
-import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemStreamPartitionIterator;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
@@ -126,6 +126,19 @@ public class CoordinatorStreamStore implements
MetadataStore {
@Override
public void put(String namespacedKey, byte[] value) {
+ putWithoutFlush(namespacedKey, value);
+ flush();
+ }
+
+ @Override
+ public void putAll(Map<String, byte[]> entries) {
+ for (Map.Entry<String, byte[]> entry : entries.entrySet()) {
+ putWithoutFlush(entry.getKey(), entry.getValue());
+ }
+ flush();
+ }
+
+ private void putWithoutFlush(String namespacedKey, byte[] value) {
// 1. Store the namespace and key into correct fields of the
CoordinatorStreamKey and convert the key to bytes.
CoordinatorMessageKey coordinatorMessageKey =
deserializeCoordinatorMessageKeyFromJson(namespacedKey);
CoordinatorStreamKeySerde keySerde = new
CoordinatorStreamKeySerde(coordinatorMessageKey.getNamespace());
@@ -134,7 +147,6 @@ public class CoordinatorStreamStore implements
MetadataStore {
// 2. Set the key, message in correct fields of {@link
OutgoingMessageEnvelope} and publish it to the coordinator stream.
OutgoingMessageEnvelope envelope = new
OutgoingMessageEnvelope(coordinatorSystemStream, 0, keyBytes, value);
systemProducer.send(SOURCE, envelope);
- flush();
}
@Override
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
index 8a28fae..f4cd527 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.CoordinatorMessageKey;
import org.apache.samza.metadatastore.MetadataStore;
@@ -64,6 +65,14 @@ public class NamespaceAwareCoordinatorStreamStore implements
MetadataStore {
}
@Override
+ public void putAll(Map<String, byte[]> entries) {
+ Map<String, byte[]> mapWithCoordinatorMessageKeys =
+ entries.entrySet().stream()
+ .collect(Collectors.toMap(e ->
getCoordinatorMessageKey(e.getKey()), e -> e.getValue()));
+ metadataStore.putAll(mapWithCoordinatorMessageKeys);
+ }
+
+ @Override
public void delete(String key) {
String coordinatorMessageKeyAsJson = getCoordinatorMessageKey(key);
metadataStore.delete(coordinatorMessageKeyAsJson);
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
index d5da781..e073a0e 100644
---
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
@@ -19,6 +19,7 @@
package org.apache.samza.coordinator.metadatastore;
import com.google.common.collect.ImmutableMap;
+import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
@@ -27,7 +28,7 @@ import org.apache.samza.serializers.Serde;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.*;
+import org.mockito.Mockito;
public class TestCoordinatorStreamStore {
@@ -84,6 +85,30 @@ public class TestCoordinatorStreamStore {
}
@Test
+ public void testPutAll() {
+ CoordinatorStreamStore spyCoordinatorStreamStore =
Mockito.spy(coordinatorStreamStore);
+ String key1 = getCoordinatorMessageKey("test-key1");
+ String key2 = getCoordinatorMessageKey("test-key2");
+ String key3 = getCoordinatorMessageKey("test-key3");
+ String key4 = getCoordinatorMessageKey("test-key4");
+ String key5 = getCoordinatorMessageKey("test-key5");
+ byte[] value1 = getValue("test-value1");
+ byte[] value2 = getValue("test-value2");
+ byte[] value3 = getValue("test-value3");
+ byte[] value4 = getValue("test-value4");
+ byte[] value5 = getValue("test-value5");
+ ImmutableMap<String, byte[]> map =
+ ImmutableMap.of(key1, value1, key2, value2, key3, value3, key4,
value4, key5, value5);
+ spyCoordinatorStreamStore.putAll(map);
+ Assert.assertEquals(value1, spyCoordinatorStreamStore.get(key1));
+ Assert.assertEquals(value2, spyCoordinatorStreamStore.get(key2));
+ Assert.assertEquals(value3, spyCoordinatorStreamStore.get(key3));
+ Assert.assertEquals(value4, spyCoordinatorStreamStore.get(key4));
+ Assert.assertEquals(value5, spyCoordinatorStreamStore.get(key5));
+ Mockito.verify(spyCoordinatorStreamStore).flush(); // verify flush called
only once during putAll
+ }
+
+ @Test
public void testAllEntries() {
String key = getCoordinatorMessageKey("test-key1");
String key1 = getCoordinatorMessageKey("test-key2");
diff --git
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
index 06197f6..6536246 100644
---
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
+++
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
@@ -63,6 +63,29 @@ public class TestNamespaceAwareCoordinatorStreamStore {
}
@Test
+ public void testPutAllShouldDelegateTheInvocationToUnderlyingStore() {
+ byte[] value =
RandomStringUtils.randomAlphabetic(5).getBytes(StandardCharsets.UTF_8);
+ ImmutableMap<String, byte[]> map = ImmutableMap.of(
+ "key1", value,
+ "key2", value,
+ "key3", value,
+ "key4", value,
+ "key5", value);
+ ImmutableMap<String, byte[]> namespacedMap = ImmutableMap.of(
+ CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace,
"key1"), value,
+ CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace,
"key2"), value,
+ CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace,
"key3"), value,
+ CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace,
"key4"), value,
+ CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace,
"key5"), value);
+
+ Mockito.doNothing().when(coordinatorStreamStore).putAll(namespacedMap);
+
+ namespaceAwareCoordinatorStreamStore.putAll(map);
+
+ Mockito.verify(coordinatorStreamStore).putAll(namespacedMap);
+ }
+
+ @Test
public void testDeleteShouldDelegateTheInvocationToUnderlyingStore() {
String namespacedKey =
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, KEY1);