Repository: samza
Updated Branches:
  refs/heads/master e25e0dab9 -> 5069f1ddb


http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
new file mode 100644
index 0000000..1f2d586
--- /dev/null
+++ 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.apache.samza.table.ReadableTable;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestLocalReadableTable {
+
+  public static final String TABLE_ID = "t1";
+
+  private List<String> keys;
+  private Map<String, String> values;
+
+  private Timer getNs;
+  private Timer getAllNs;
+  private Counter numGets;
+  private Counter numGetAlls;
+  private Timer getCallbackNs;
+  private Counter numMissedLookups;
+
+  private MetricsRegistry metricsRegistry;
+
+  private KeyValueStore kvStore;
+
+  @Before
+  public void setUp() {
+    keys = Arrays.asList("k1", "k2", "k3");
+
+    values = new HashMap<>();
+    values.put("k1", "v1");
+    values.put("k2", "v2");
+    values.put("k3", null);
+
+    kvStore = mock(KeyValueStore.class);
+    when(kvStore.get("k1")).thenReturn("v1");
+    when(kvStore.get("k2")).thenReturn("v2");
+    when(kvStore.getAll(keys)).thenReturn(values);
+
+    getNs = new Timer("");
+    getAllNs = new Timer("");
+    numGets = new Counter("");
+    numGetAlls = new Counter("");
+    getCallbackNs = new Timer("");
+    numMissedLookups = new Counter("");
+
+    metricsRegistry = mock(MetricsRegistry.class);
+    String groupName = LocalReadableTable.class.getSimpleName();
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + 
"-num-gets")).thenReturn(numGets);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + 
"-num-getAlls")).thenReturn(numGetAlls);
+    when(metricsRegistry.newCounter(groupName, TABLE_ID + 
"-num-missed-lookups")).thenReturn(numMissedLookups);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + 
"-get-ns")).thenReturn(getNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + 
"-getAll-ns")).thenReturn(getAllNs);
+    when(metricsRegistry.newTimer(groupName, TABLE_ID + 
"-get-callback-ns")).thenReturn(getCallbackNs);
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    ReadableTable table = createTable(false);
+    Assert.assertEquals("v1", table.get("k1"));
+    Assert.assertEquals("v2", table.getAsync("k2").get());
+    Assert.assertNull(table.get("k3"));
+    verify(kvStore, times(3)).get(any());
+    Assert.assertEquals(3, numGets.getCount());
+    Assert.assertEquals(1, numMissedLookups.getCount());
+    Assert.assertTrue(getNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, numGetAlls.getCount());
+    Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testGetAll() throws Exception {
+    ReadableTable table = createTable(false);
+    Assert.assertEquals(values, table.getAll(keys));
+    Assert.assertEquals(values, table.getAllAsync(keys).get());
+    verify(kvStore, times(2)).getAll(any());
+    Assert.assertEquals(Collections.emptyMap(), 
table.getAll(Collections.emptyList()));
+    Assert.assertEquals(2, numMissedLookups.getCount());
+    Assert.assertEquals(3, numGetAlls.getCount());
+    Assert.assertTrue(getAllNs.getSnapshot().getAverage() > 0);
+    Assert.assertEquals(0, numGets.getCount());
+    Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  @Test
+  public void testTimerDisabled() throws Exception {
+    ReadableTable table = createTable(true);
+    table.get("");
+    table.getAsync("").get();
+    table.getAll(Collections.emptyList());
+    table.getAllAsync(Collections.emptyList()).get();
+    verify(metricsRegistry, atLeast(1)).newCounter(anyString(), anyString());
+    verify(metricsRegistry, times(0)).newTimer(anyString(), anyString());
+    verify(metricsRegistry, times(0)).newGauge(anyString(), any());
+    Assert.assertEquals(2, numGets.getCount());
+    Assert.assertEquals(2, numMissedLookups.getCount());
+    Assert.assertEquals(2, numGetAlls.getCount());
+    Assert.assertEquals(0, getNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getAllNs.getSnapshot().getAverage(), 0.001);
+    Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
+  }
+
+  private LocalReadableTable createTable(boolean isTimerDisabled) {
+    Map<String, String> config = new HashMap<>();
+    if (isTimerDisabled) {
+      config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
+    }
+    Context context = mock(Context.class);
+    JobContext jobContext = mock(JobContext.class);
+    when(context.getJobContext()).thenReturn(jobContext);
+    when(jobContext.getConfig()).thenReturn(new MapConfig(config));
+    ContainerContext containerContext = mock(ContainerContext.class);
+    when(context.getContainerContext()).thenReturn(containerContext);
+    
when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
+
+    LocalReadableTable table =  new LocalReadableTable("t1", kvStore);
+    table.init(context);
+
+    return table;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
new file mode 100644
index 0000000..5367931
--- /dev/null
+++ 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import junit.framework.Assert;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.context.TaskContext;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class TestLocalTableProvider {
+
+  @Test
+  public void testInit() {
+    Context context = mock(Context.class);
+    JobContext jobContext = mock(JobContext.class);
+    when(context.getJobContext()).thenReturn(jobContext);
+    when(jobContext.getConfig()).thenReturn(new MapConfig());
+    ContainerContext containerContext = mock(ContainerContext.class);
+    when(context.getContainerContext()).thenReturn(containerContext);
+    when(containerContext.getContainerMetricsRegistry()).thenReturn(new 
NoOpMetricsRegistry());
+    TaskContext taskContext = mock(TaskContext.class);
+    when(context.getTaskContext()).thenReturn(taskContext);
+    when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class));
+
+    TableProvider tableProvider = createTableProvider("t1");
+    tableProvider.init(context);
+    Assert.assertNotNull(tableProvider.getTable());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testInitFail() {
+    TableProvider tableProvider = createTableProvider("t1");
+    Assert.assertNotNull(tableProvider.getTable());
+  }
+
+  private TableProvider createTableProvider(String tableId) {
+    return new LocalTableProvider(tableId) {
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
 
b/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
deleted file mode 100644
index 752b91e..0000000
--- 
a/samza-kv/src/test/java/org/apache/samza/storage/kv/descriptors/TestLocalTableProvider.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv.descriptors;
-
-import junit.framework.Assert;
-import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContext;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.storage.kv.LocalTableProvider;
-import org.apache.samza.table.TableProvider;
-import org.apache.samza.util.NoOpMetricsRegistry;
-import org.junit.Test;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-
-public class TestLocalTableProvider {
-
-  @Test
-  public void testInit() {
-    Context context = mock(Context.class);
-    TaskContext taskContext = mock(TaskContext.class);
-    when(context.getTaskContext()).thenReturn(taskContext);
-    when(taskContext.getStore(any())).thenReturn(mock(KeyValueStore.class));
-    when(taskContext.getTaskMetricsRegistry()).thenReturn(new 
NoOpMetricsRegistry());
-
-    TableProvider tableProvider = createTableProvider("t1");
-    tableProvider.init(context);
-    Assert.assertNotNull(tableProvider.getTable());
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testInitFail() {
-    TableProvider tableProvider = createTableProvider("t1");
-    Assert.assertNotNull(tableProvider.getTable());
-  }
-
-  private TableProvider createTableProvider(String tableId) {
-    return new LocalTableProvider(tableId) {
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index 80cb789..89a32d8 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -21,11 +21,9 @@ package org.apache.samza.sql.impl;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
-import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
 import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOResolverFactory;

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index fa279f2..e112804 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -20,11 +20,11 @@
 package org.apache.samza.test.table;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.TaskApplication;
 import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
@@ -34,12 +34,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.context.Context;
-import org.apache.samza.context.TaskContext;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Gauge;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.metrics.Timer;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
@@ -49,9 +44,6 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.storage.kv.LocalReadWriteTable;
 import 
org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.table.ReadWriteTable;
@@ -65,6 +57,7 @@ import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.ArraySystemFactory;
 import org.apache.samza.test.util.Base64Serializer;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -74,16 +67,9 @@ import static 
org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
 import static 
org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
 import static org.apache.samza.test.table.TestTableData.Profile;
 import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 
 /**
@@ -357,50 +343,6 @@ public class TestLocalTable extends 
AbstractIntegrationTestHarness {
   }
 
   @Test
-  public void testAsyncOperation() throws Exception {
-    KeyValueStore kvStore = mock(KeyValueStore.class);
-    LocalReadWriteTable<String, String> table = new 
LocalReadWriteTable<>("table1", kvStore);
-    Context context = mock(Context.class);
-    TaskContext taskContext = mock(TaskContext.class);
-    when(context.getTaskContext()).thenReturn(taskContext);
-    MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
-    doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), 
anyString());
-    
doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), 
anyString());
-    doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), 
any());
-    doReturn(metricsRegistry).when(taskContext).getTaskMetricsRegistry();
-
-    table.init(context);
-
-    // GET
-    doReturn("bar").when(kvStore).get(anyString());
-    Assert.assertEquals("bar", table.getAsync("foo").get());
-
-    // GET-ALL
-    Map<String, String> recordMap = new HashMap<>();
-    recordMap.put("foo1", "bar1");
-    recordMap.put("foo2", "bar2");
-    doReturn(recordMap).when(kvStore).getAll(anyList());
-    Assert.assertEquals(recordMap, table.getAllAsync(Arrays.asList("foo1", 
"foo2")).get());
-
-    // PUT
-    table.putAsync("foo1", "bar1").get();
-    verify(kvStore, times(1)).put(anyString(), anyString());
-
-    // PUT-ALL
-    List<Entry<String, String>> records = Arrays.asList(new Entry<>("foo1", 
"bar1"), new Entry<>("foo2", "bar2"));
-    table.putAllAsync(records).get();
-    verify(kvStore, times(1)).putAll(anyList());
-
-    // DELETE
-    table.deleteAsync("foo").get();
-    verify(kvStore, times(1)).delete(anyString());
-
-    // DELETE-ALL
-    table.deleteAllAsync(Arrays.asList("foo1", "foo2")).get();
-    verify(kvStore, times(1)).deleteAll(anyList());
-  }
-
-  @Test
   public void testWithLowLevelApi() throws Exception {
 
     Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), 
zkConnect());

http://git-wip-us.apache.org/repos/asf/samza/blob/5069f1dd/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java 
b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
index 8218b8b..c9228af 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java
@@ -20,6 +20,7 @@
 package org.apache.samza.test.table;
 
 import com.google.common.cache.CacheBuilder;
+
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.time.Duration;
@@ -33,6 +34,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
@@ -60,6 +62,7 @@ import org.apache.samza.table.remote.TableWriteFunction;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.Base64Serializer;
 import org.apache.samza.util.RateLimiter;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -68,6 +71,7 @@ import static 
org.apache.samza.test.table.TestTableData.PageView;
 import static org.apache.samza.test.table.TestTableData.Profile;
 import static org.apache.samza.test.table.TestTableData.generatePageViews;
 import static org.apache.samza.test.table.TestTableData.generateProfiles;
+
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -248,7 +252,8 @@ public class TestRemoteTable extends 
AbstractIntegrationTestHarness {
     doReturn(new Counter("")).when(metricsRegistry).newCounter(anyString(), 
anyString());
     doReturn(new Timer("")).when(metricsRegistry).newTimer(anyString(), 
anyString());
     Context context = new MockContext();
-    
doReturn(metricsRegistry).when(context.getTaskContext()).getTaskMetricsRegistry();
+    doReturn(new MapConfig()).when(context.getJobContext()).getConfig();
+    
doReturn(metricsRegistry).when(context.getContainerContext()).getContainerMetricsRegistry();
     return context;
   }
 

Reply via email to