This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new bb3721a  SAMZA-2276: Add Metdata store putAll API
bb3721a is described below

commit bb3721a3a572e74799a08d2391f4fbd61e3377eb
Author: Daniel Nishimura <[email protected]>
AuthorDate: Tue Jul 30 10:02:54 2019 -0700

    SAMZA-2276: Add Metdata store putAll API
    
    Add a putsAll API to the metadata store API. This allows for a delayed 
flush() call for underlying stores that require a flush() after write such as 
Kafka.
    
    Author: Daniel Nishimura <[email protected]>
    
    Reviewers: mynameborat <[email protected]>
    
    Closes #1112 from dnishimura/samza-2276-metadata-store-putall-api and 
squashes the following commits:
    
    830b5994 [Daniel Nishimura] Update javadoc for MetadataStore#putAll
    e52d884c [Daniel Nishimura] SAMZA-2271: Add Metdata store putAll API
---
 .../apache/samza/metadatastore/MetadataStore.java  | 13 +++++++++-
 .../metadatastore/CoordinatorStreamStore.java      | 28 +++++++++++++++-------
 .../NamespaceAwareCoordinatorStreamStore.java      |  9 +++++++
 .../metadatastore/TestCoordinatorStreamStore.java  | 27 ++++++++++++++++++++-
 .../TestNamespaceAwareCoordinatorStreamStore.java  | 23 ++++++++++++++++++
 5 files changed, 90 insertions(+), 10 deletions(-)

diff --git 
a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java 
b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
index 9009a65..1358aa5 100644
--- a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
+++ b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.metadatastore;
 
-import org.apache.samza.annotation.InterfaceStability;
 import java.util.Map;
+import org.apache.samza.annotation.InterfaceStability;
 
 /**
  * Store abstraction responsible for managing the metadata of a Samza job.
@@ -50,6 +50,17 @@ public interface MetadataStore {
   void put(String key, byte[] value);
 
   /**
+   * Updates the mapping with the specified map. This is not guaranteed to be 
atomic.
+   *
+   * @param entries mapping of key to values to write to the metadata store
+   */
+  default void putAll(Map<String, byte[]> entries) {
+    for (Map.Entry<String, byte[]> entry : entries.entrySet()) {
+      put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
    * Deletes the mapping for the specified {@code key} from this metadata 
store (if such mapping exists).
    *
    * @param key the key for which the mapping is to be deleted.
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index 4752cdc..e01a4c6 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -23,11 +23,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -38,16 +38,16 @@ import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemProducer;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.system.SystemStreamPartitionIterator;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
-import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemStreamPartitionIterator;
 import org.apache.samza.util.CoordinatorStreamUtil;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -126,6 +126,19 @@ public class CoordinatorStreamStore implements 
MetadataStore {
 
   @Override
   public void put(String namespacedKey, byte[] value) {
+    putWithoutFlush(namespacedKey, value);
+    flush();
+  }
+
+  @Override
+  public void putAll(Map<String, byte[]> entries) {
+    for (Map.Entry<String, byte[]> entry : entries.entrySet()) {
+      putWithoutFlush(entry.getKey(), entry.getValue());
+    }
+    flush();
+  }
+
+  private void putWithoutFlush(String namespacedKey, byte[] value) {
     // 1. Store the namespace and key into correct fields of the 
CoordinatorStreamKey and convert the key to bytes.
     CoordinatorMessageKey coordinatorMessageKey = 
deserializeCoordinatorMessageKeyFromJson(namespacedKey);
     CoordinatorStreamKeySerde keySerde = new 
CoordinatorStreamKeySerde(coordinatorMessageKey.getNamespace());
@@ -134,7 +147,6 @@ public class CoordinatorStreamStore implements 
MetadataStore {
     // 2. Set the key, message in correct fields of {@link 
OutgoingMessageEnvelope} and publish it to the coordinator stream.
     OutgoingMessageEnvelope envelope = new 
OutgoingMessageEnvelope(coordinatorSystemStream, 0, keyBytes, value);
     systemProducer.send(SOURCE, envelope);
-    flush();
   }
 
   @Override
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
index 8a28fae..f4cd527 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/NamespaceAwareCoordinatorStreamStore.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore.CoordinatorMessageKey;
 import org.apache.samza.metadatastore.MetadataStore;
 
@@ -64,6 +65,14 @@ public class NamespaceAwareCoordinatorStreamStore implements 
MetadataStore {
   }
 
   @Override
+  public void putAll(Map<String, byte[]> entries) {
+    Map<String, byte[]> mapWithCoordinatorMessageKeys =
+        entries.entrySet().stream()
+            .collect(Collectors.toMap(e -> 
getCoordinatorMessageKey(e.getKey()), e -> e.getValue()));
+    metadataStore.putAll(mapWithCoordinatorMessageKeys);
+  }
+
+  @Override
   public void delete(String key) {
     String coordinatorMessageKeyAsJson = getCoordinatorMessageKey(key);
     metadataStore.delete(coordinatorMessageKeyAsJson);
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
index d5da781..e073a0e 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
@@ -19,6 +19,7 @@
 package org.apache.samza.coordinator.metadatastore;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
@@ -27,7 +28,7 @@ import org.apache.samza.serializers.Serde;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import java.util.*;
+import org.mockito.Mockito;
 
 public class TestCoordinatorStreamStore {
 
@@ -84,6 +85,30 @@ public class TestCoordinatorStreamStore {
   }
 
   @Test
+  public void testPutAll() {
+    CoordinatorStreamStore spyCoordinatorStreamStore = 
Mockito.spy(coordinatorStreamStore);
+    String key1 = getCoordinatorMessageKey("test-key1");
+    String key2 = getCoordinatorMessageKey("test-key2");
+    String key3 = getCoordinatorMessageKey("test-key3");
+    String key4 = getCoordinatorMessageKey("test-key4");
+    String key5 = getCoordinatorMessageKey("test-key5");
+    byte[] value1 = getValue("test-value1");
+    byte[] value2 = getValue("test-value2");
+    byte[] value3 = getValue("test-value3");
+    byte[] value4 = getValue("test-value4");
+    byte[] value5 = getValue("test-value5");
+    ImmutableMap<String, byte[]> map =
+        ImmutableMap.of(key1, value1, key2, value2, key3, value3, key4, 
value4, key5, value5);
+    spyCoordinatorStreamStore.putAll(map);
+    Assert.assertEquals(value1, spyCoordinatorStreamStore.get(key1));
+    Assert.assertEquals(value2, spyCoordinatorStreamStore.get(key2));
+    Assert.assertEquals(value3, spyCoordinatorStreamStore.get(key3));
+    Assert.assertEquals(value4, spyCoordinatorStreamStore.get(key4));
+    Assert.assertEquals(value5, spyCoordinatorStreamStore.get(key5));
+    Mockito.verify(spyCoordinatorStreamStore).flush(); // verify flush called 
only once during putAll
+  }
+
+  @Test
   public void testAllEntries() {
     String key = getCoordinatorMessageKey("test-key1");
     String key1 = getCoordinatorMessageKey("test-key2");
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
index 06197f6..6536246 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestNamespaceAwareCoordinatorStreamStore.java
@@ -63,6 +63,29 @@ public class TestNamespaceAwareCoordinatorStreamStore {
   }
 
   @Test
+  public void testPutAllShouldDelegateTheInvocationToUnderlyingStore() {
+    byte[] value = 
RandomStringUtils.randomAlphabetic(5).getBytes(StandardCharsets.UTF_8);
+    ImmutableMap<String, byte[]> map = ImmutableMap.of(
+        "key1", value,
+        "key2", value,
+        "key3", value,
+        "key4", value,
+        "key5", value);
+    ImmutableMap<String, byte[]> namespacedMap = ImmutableMap.of(
+        CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, 
"key1"), value,
+        CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, 
"key2"), value,
+        CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, 
"key3"), value,
+        CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, 
"key4"), value,
+        CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, 
"key5"), value);
+
+    Mockito.doNothing().when(coordinatorStreamStore).putAll(namespacedMap);
+
+    namespaceAwareCoordinatorStreamStore.putAll(map);
+
+    Mockito.verify(coordinatorStreamStore).putAll(namespacedMap);
+  }
+
+  @Test
   public void testDeleteShouldDelegateTheInvocationToUnderlyingStore() {
     String namespacedKey = 
CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(namespace, KEY1);
 

Reply via email to