PanTheMan commented on a change in pull request #1352:
URL: https://github.com/apache/samza/pull/1352#discussion_r416919124



##########
File path: 
samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.storage.StorageEngineFactory;
+import org.apache.samza.storage.StoreProperties;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.util.HighResolutionClock;
+import org.apache.samza.util.ScalaJavaUtil;
+
+
+/**
+ * This encapsulates all the steps needed to create a key value storage engine.
+ * This is meant to be extended by the specific key value store factory 
implementations which will in turn override the
+ * getKVStore method to return a raw key-value store.
+ */
+public abstract class BaseKeyValueStorageEngineFactory<K, V> implements 
StorageEngineFactory<K, V> {
+  private static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
+      
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
+
+  /**
+   * Implement this to return a KeyValueStore instance for the given store 
name, which will be used as the underlying
+   * raw store.
+   *
+   * @param storeName Name of the store
+   * @param storeDir The directory of the store
+   * @param registry MetricsRegistry to which to publish store specific 
metrics.
+   * @param changeLogSystemStreamPartition Samza stream partition from which 
to receive the changelog.
+   * @param jobContext Information about the job in which the task is 
executing.
+   * @param containerContext Information about the container in which the task 
is executing.
+   * @return A raw KeyValueStore instance
+   */
+  protected abstract KeyValueStore<byte[], byte[]> getKVStore(String storeName,
+      File storeDir,
+      MetricsRegistry registry,
+      SystemStreamPartition changeLogSystemStreamPartition,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode);
+
+  /**
+   * Constructs a key-value StorageEngine and returns it to the caller
+   *
+   * @param storeName The name of the storage engine.
+   * @param storeDir The directory of the storage engine.
+   * @param keySerde The serializer to use for serializing keys when reading 
or writing to the store.
+   * @param msgSerde The serializer to use for serializing messages when 
reading or writing to the store.
+   * @param changelogCollector MessageCollector the storage engine uses to 
persist changes.
+   * @param registry MetricsRegistry to which to publish storage-engine 
specific metrics.
+   * @param changelogSSP Samza system stream partition from which to receive 
the changelog.
+   * @param containerContext Information about the container in which the task 
is executing.
+   **/
+  public StorageEngine getStorageEngine(String storeName,
+      File storeDir,
+      Serde<K> keySerde,
+      Serde<V> msgSerde,
+      MessageCollector changelogCollector,
+      MetricsRegistry registry,
+      SystemStreamPartition changelogSSP,
+      JobContext jobContext,
+      ContainerContext containerContext,
+      StoreMode storeMode) {
+    Config storageConfigSubset = jobContext.getConfig().subset("stores." + 
storeName + ".", true);
+    StorageConfig storageConfig = new StorageConfig(jobContext.getConfig());
+    Optional<String> storeFactory = 
storageConfig.getStorageFactoryClassName(storeName);
+    StoreProperties.StorePropertiesBuilder storePropertiesBuilder = new 
StoreProperties.StorePropertiesBuilder();
+    if (!storeFactory.isPresent() || StringUtils.isBlank(storeFactory.get())) {
+      throw new SamzaException("Store factory not defined. Cannot proceed with 
KV store creation!");
+    }
+    if (!storeFactory.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) {
+      storePropertiesBuilder.setPersistedToDisk(true);
+    }
+    int batchSize = storageConfigSubset.getInt("write.batch.size", 500);
+    int cacheSize = storageConfigSubset.getInt("object.cache.size", 
Math.max(batchSize, 1000));
+    if (cacheSize > 0 && cacheSize < batchSize) {
+      throw new SamzaException(
+          "A store's cache.size cannot be less than batch.size as batched 
values reside in cache.");
+    }
+    if (keySerde == null) {
+      throw new SamzaException("Must define a key serde when using key value 
storage.");
+    }
+    if (msgSerde == null) {
+      throw new SamzaException("Must define a message serde when using key 
value storage.");
+    }
+
+    KeyValueStore<byte[], byte[]> rawStore =
+        getKVStore(storeName, storeDir, registry, changelogSSP, jobContext, 
containerContext, storeMode);
+    KeyValueStore<byte[], byte[]> maybeLoggedStore = 
buildMaybeLoggedStore(changelogSSP,
+        storeName, registry, storePropertiesBuilder, rawStore, 
changelogCollector);
+    // this also applies serialization and caching layers
+    KeyValueStore<K, V> toBeAccessLoggedStore = 
applyLargeMessageHandling(storeName, registry,
+        maybeLoggedStore, storageConfig, cacheSize, batchSize, keySerde, 
msgSerde);
+    KeyValueStore<K, V> maybeAccessLoggedStore =
+        buildMaybeAccessLoggedStore(storeName, toBeAccessLoggedStore, 
changelogCollector, changelogSSP, storageConfig,
+            keySerde);
+    KeyValueStore<K, V> nullSafeStore = new 
NullSafeKeyValueStore<>(maybeAccessLoggedStore);
+
+    KeyValueStorageEngineMetrics keyValueStorageEngineMetrics = new 
KeyValueStorageEngineMetrics(storeName, registry);
+    HighResolutionClock clock = buildClock(jobContext.getConfig());
+    return new KeyValueStorageEngine<>(storeName, storeDir, 
storePropertiesBuilder.build(), nullSafeStore, rawStore,
+        changelogSSP, changelogCollector, keyValueStorageEngineMetrics, 
batchSize,
+        ScalaJavaUtil.toScalaFunction(clock::nanoTime));
+  }
+
+  private static KeyValueStore<byte[], byte[]> 
buildMaybeLoggedStore(SystemStreamPartition changelogSSP,
+      String storeName,
+      MetricsRegistry registry,
+      StoreProperties.StorePropertiesBuilder storePropertiesBuilder,
+      KeyValueStore<byte[], byte[]> storeToWrap,
+      MessageCollector changelogCollector) {
+    if (changelogSSP == null) {
+      return storeToWrap;
+    } else {
+      LoggedStoreMetrics loggedStoreMetrics = new 
LoggedStoreMetrics(storeName, registry);
+      storePropertiesBuilder.setLoggedStore(true);
+      return new LoggedStore<>(storeToWrap, changelogSSP, changelogCollector, 
loggedStoreMetrics);
+    }
+  }
+
+  private static <T, U> KeyValueStore<T, U> applyLargeMessageHandling(String 
storeName,
+      MetricsRegistry registry,
+      KeyValueStore<byte[], byte[]> storeToWrap,
+      StorageConfig storageConfig,
+      int cacheSize,
+      int batchSize,
+      Serde<T> keySerde,
+      Serde<U> msgSerde) {
+    int maxMessageSize = storageConfig.getChangelogMaxMsgSizeBytes(storeName);
+    if (storageConfig.getDisallowLargeMessages(storeName)) {
+      /*
+       * If large messages are disallowed in config, then this creates a 
LargeMessageSafeKeyValueStore that throws a
+       * RecordTooLargeException when a large message is encountered.
+       */
+      KeyValueStore<byte[], byte[]> maybeCachedStore =
+          buildMaybeCachedStore(storeName, registry, storeToWrap, cacheSize, 
batchSize);
+      LargeMessageSafeStore largeMessageSafeKeyValueStore =
+          new LargeMessageSafeStore(maybeCachedStore, storeName, false, 
maxMessageSize);
+      return buildSerializedStore(storeName, registry, 
largeMessageSafeKeyValueStore, keySerde, msgSerde);
+    } else {
+      KeyValueStore<byte[], byte[]> toBeSerializedStore;
+      if (storageConfig.getDropLargeMessages(storeName)) {
+        toBeSerializedStore = new LargeMessageSafeStore(storeToWrap, 
storeName, true, maxMessageSize);
+      } else {
+        toBeSerializedStore = storeToWrap;
+      }
+      KeyValueStore<T, U> serializedStore =
+          buildSerializedStore(storeName, registry, toBeSerializedStore, 
keySerde, msgSerde);
+      return buildMaybeCachedStore(storeName, registry, serializedStore, 
cacheSize, batchSize);

Review comment:
       Is there a difference here between the order of wrapping each store 
between disallowing large messages vs. allowing? Ie. in the if statement, you 
first create the maybeCachedStore first and then the LargeMessageSafeStore, 
then the SerializedStore vs in the else statement, you first create the 
LargeMessageSafeStore, the SerializedStore, then the maybeCachedStore?
   Also if there isn't a difference in order, couldn't we refactor this section 
of the code into a function that is passed  the **getDisallowLargeMessages** as 
a param to return the final store?

##########
File path: 
samza-kv/src/test/java/org/apache/samza/storage/kv/TestBaseKeyValueStorageEngineFactory.java
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.io.File;
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.storage.StorageEngineFactory;
+import org.apache.samza.storage.StoreProperties;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestBaseKeyValueStorageEngineFactory {
+  private static final String STORE_NAME = "myStore";
+  private static final StorageEngineFactory.StoreMode STORE_MODE = 
StorageEngineFactory.StoreMode.ReadWrite;
+  private static final SystemStreamPartition CHANGELOG_SSP =
+      new SystemStreamPartition("system", "stream", new Partition(0));
+  private static final Map<String, String> BASE_CONFIG =
+      ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME),
+          MockKeyValueStorageEngineFactory.class.getName());
+  private static final Map<String, String> DISABLE_CACHE =
+      ImmutableMap.of(String.format("stores.%s.object.cache.size", 
STORE_NAME), "0");
+  private static final Map<String, String> DISALLOW_LARGE_MESSAGES =
+      ImmutableMap.of(String.format(StorageConfig.DISALLOW_LARGE_MESSAGES, 
STORE_NAME), "true");
+  private static final Map<String, String> DROP_LARGE_MESSAGES =
+      ImmutableMap.of(String.format(StorageConfig.DROP_LARGE_MESSAGES, 
STORE_NAME), "true");
+  private static final Map<String, String> ACCESS_LOG_ENABLED =
+      ImmutableMap.of(String.format("stores.%s.accesslog.enabled", 
STORE_NAME), "true");
+
+  @Mock
+  private File storeDir;
+  @Mock
+  private Serde<String> keySerde;
+  @Mock
+  private Serde<String> msgSerde;
+  @Mock
+  private MessageCollector changelogCollector;
+  @Mock
+  private MetricsRegistry metricsRegistry;
+  @Mock
+  private JobContext jobContext;
+  @Mock
+  private ContainerContext containerContext;
+  @Mock
+  private KeyValueStore<byte[], byte[]> rawKeyValueStore;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    // some metrics objects need this for histogram metric instantiation
+    when(this.metricsRegistry.newGauge(any(), 
any())).thenReturn(mock(Gauge.class));
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testMissingStoreFactory() {
+    Config config = new MapConfig();
+    callGetStorageEngine(config, null);
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testInvalidCacheSize() {
+    Config config = new MapConfig(BASE_CONFIG,
+        ImmutableMap.of(String.format("stores.%s.write.cache.batch", 
STORE_NAME), "100",
+            String.format("stores.%s.object.cache.size", STORE_NAME), "50"));
+    callGetStorageEngine(config, null);
+  }
+
+  @Test
+  public void testInMemoryKeyValueStore() {
+    Config config = new MapConfig(DISABLE_CACHE, 
ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME),
+        
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory"));
+    StorageEngine storageEngine = callGetStorageEngine(config, null);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = 
baseStorageEngineValidation(storageEngine);
+    assertStoreProperties(keyValueStorageEngine.getStoreProperties(), false, 
false);
+    NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
+        assertAndCast(keyValueStorageEngine.getWrapperStore(), 
NullSafeKeyValueStore.class);
+    SerializedKeyValueStore<?, ?> serializedKeyValueStore =
+        assertAndCast(nullSafeKeyValueStore.getStore(), 
SerializedKeyValueStore.class);
+    // config has the in-memory key-value factory, but still calling the test 
factory, so store will be the test store
+    assertEquals(this.rawKeyValueStore, serializedKeyValueStore.getStore());
+  }
+
+  @Test
+  public void testRawStoreOnly() {
+    Config config = new MapConfig(BASE_CONFIG, DISABLE_CACHE);
+    StorageEngine storageEngine = callGetStorageEngine(config, null);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = 
baseStorageEngineValidation(storageEngine);
+    assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, 
false);
+    NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
+        assertAndCast(keyValueStorageEngine.getWrapperStore(), 
NullSafeKeyValueStore.class);
+    SerializedKeyValueStore<?, ?> serializedKeyValueStore =
+        assertAndCast(nullSafeKeyValueStore.getStore(), 
SerializedKeyValueStore.class);
+    assertEquals(this.rawKeyValueStore, serializedKeyValueStore.getStore());
+  }
+
+  @Test
+  public void testWithLoggedStore() {
+    Config config = new MapConfig(BASE_CONFIG, DISABLE_CACHE);
+    StorageEngine storageEngine = callGetStorageEngine(config, CHANGELOG_SSP);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = 
baseStorageEngineValidation(storageEngine);
+    assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, 
true);
+    NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
+        assertAndCast(keyValueStorageEngine.getWrapperStore(), 
NullSafeKeyValueStore.class);
+    SerializedKeyValueStore<?, ?> serializedKeyValueStore =
+        assertAndCast(nullSafeKeyValueStore.getStore(), 
SerializedKeyValueStore.class);
+    LoggedStore<?, ?> loggedStore = 
assertAndCast(serializedKeyValueStore.getStore(), LoggedStore.class);
+    // noinspection AssertEqualsBetweenInconvertibleTypes
+    assertEquals(this.rawKeyValueStore, loggedStore.getStore());
+  }
+
+  @Test
+  public void testWithLoggedStoreWithCache() {
+    Config config = new MapConfig(BASE_CONFIG);
+    StorageEngine storageEngine = callGetStorageEngine(config, CHANGELOG_SSP);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = 
baseStorageEngineValidation(storageEngine);
+    assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, 
true);
+    NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
+        assertAndCast(keyValueStorageEngine.getWrapperStore(), 
NullSafeKeyValueStore.class);
+    CachedStore<?, ?> cachedStore = 
assertAndCast(nullSafeKeyValueStore.getStore(), CachedStore.class);
+    SerializedKeyValueStore<?, ?> serializedKeyValueStore =
+        assertAndCast(cachedStore.getStore(), SerializedKeyValueStore.class);
+    LoggedStore<?, ?> loggedStore = 
assertAndCast(serializedKeyValueStore.getStore(), LoggedStore.class);
+    // noinspection AssertEqualsBetweenInconvertibleTypes
+    assertEquals(this.rawKeyValueStore, loggedStore.getStore());
+  }
+
+  @Test
+  public void testDisallowLargeMessages() {
+    Config config = new MapConfig(BASE_CONFIG, DISABLE_CACHE, 
DISALLOW_LARGE_MESSAGES);
+    StorageEngine storageEngine = callGetStorageEngine(config, null);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = 
baseStorageEngineValidation(storageEngine);
+    assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, 
false);
+    NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
+        assertAndCast(keyValueStorageEngine.getWrapperStore(), 
NullSafeKeyValueStore.class);
+    SerializedKeyValueStore<?, ?> serializedKeyValueStore =
+        assertAndCast(nullSafeKeyValueStore.getStore(), 
SerializedKeyValueStore.class);
+    LargeMessageSafeStore largeMessageSafeStore =
+        assertAndCast(serializedKeyValueStore.getStore(), 
LargeMessageSafeStore.class);
+    assertEquals(this.rawKeyValueStore, largeMessageSafeStore.getStore());
+  }
+
+  @Test
+  public void testDisallowLargeMessagesWithCache() {
+    Config config = new MapConfig(BASE_CONFIG, DISALLOW_LARGE_MESSAGES);
+    StorageEngine storageEngine = callGetStorageEngine(config, null);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = 
baseStorageEngineValidation(storageEngine);
+    assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, 
false);
+    NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
+        assertAndCast(keyValueStorageEngine.getWrapperStore(), 
NullSafeKeyValueStore.class);
+    SerializedKeyValueStore<?, ?> serializedKeyValueStore =
+        assertAndCast(nullSafeKeyValueStore.getStore(), 
SerializedKeyValueStore.class);
+    LargeMessageSafeStore largeMessageSafeStore =
+        assertAndCast(serializedKeyValueStore.getStore(), 
LargeMessageSafeStore.class);
+    CachedStore<?, ?> cachedStore = 
assertAndCast(largeMessageSafeStore.getStore(), CachedStore.class);
+    // noinspection AssertEqualsBetweenInconvertibleTypes
+    assertEquals(this.rawKeyValueStore, cachedStore.getStore());
+  }
+
+  @Test
+  public void testDropLargeMessages() {
+    Config config = new MapConfig(BASE_CONFIG, DISABLE_CACHE, 
DROP_LARGE_MESSAGES);
+    StorageEngine storageEngine = callGetStorageEngine(config, null);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = 
baseStorageEngineValidation(storageEngine);
+    assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, 
false);
+    NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
+        assertAndCast(keyValueStorageEngine.getWrapperStore(), 
NullSafeKeyValueStore.class);
+    SerializedKeyValueStore<?, ?> serializedKeyValueStore =
+        assertAndCast(nullSafeKeyValueStore.getStore(), 
SerializedKeyValueStore.class);
+    LargeMessageSafeStore largeMessageSafeStore =
+        assertAndCast(serializedKeyValueStore.getStore(), 
LargeMessageSafeStore.class);
+    assertEquals(this.rawKeyValueStore, largeMessageSafeStore.getStore());
+  }
+
+  @Test
+  public void testDropLargeMessagesWithCache() {
+    Config config = new MapConfig(BASE_CONFIG, DROP_LARGE_MESSAGES);
+    StorageEngine storageEngine = callGetStorageEngine(config, null);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = 
baseStorageEngineValidation(storageEngine);
+    assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, 
false);
+    NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
+        assertAndCast(keyValueStorageEngine.getWrapperStore(), 
NullSafeKeyValueStore.class);
+    CachedStore<?, ?> cachedStore = 
assertAndCast(nullSafeKeyValueStore.getStore(), CachedStore.class);
+    SerializedKeyValueStore<?, ?> serializedKeyValueStore =
+        assertAndCast(cachedStore.getStore(), SerializedKeyValueStore.class);
+    LargeMessageSafeStore largeMessageSafeStore =
+        assertAndCast(serializedKeyValueStore.getStore(), 
LargeMessageSafeStore.class);
+    assertEquals(this.rawKeyValueStore, largeMessageSafeStore.getStore());
+  }
+
+  @Test
+  public void testAccessLogStore() {
+    Config config = new MapConfig(BASE_CONFIG, DISABLE_CACHE, 
ACCESS_LOG_ENABLED);
+    // AccessLoggedStore requires a changelog SSP
+    StorageEngine storageEngine = callGetStorageEngine(config, CHANGELOG_SSP);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = 
baseStorageEngineValidation(storageEngine);
+    assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, 
true);
+    NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
+        assertAndCast(keyValueStorageEngine.getWrapperStore(), 
NullSafeKeyValueStore.class);
+    AccessLoggedStore<?, ?> accessLoggedStore =
+        assertAndCast(nullSafeKeyValueStore.getStore(), 
AccessLoggedStore.class);
+    SerializedKeyValueStore<?, ?> serializedKeyValueStore =
+        assertAndCast(accessLoggedStore.getStore(), 
SerializedKeyValueStore.class);
+    LoggedStore<?, ?> loggedStore = 
assertAndCast(serializedKeyValueStore.getStore(), LoggedStore.class);
+    // noinspection AssertEqualsBetweenInconvertibleTypes
+    assertEquals(this.rawKeyValueStore, loggedStore.getStore());
+  }
+
+  private static <T extends KeyValueStore<?, ?>> T 
assertAndCast(KeyValueStore<?, ?> keyValueStore, Class<T> clazz) {
+    assertTrue("Expected type " + clazz.getName(), 
clazz.isInstance(keyValueStore));
+    return clazz.cast(keyValueStore);
+  }
+
+  private KeyValueStorageEngine<?, ?> 
baseStorageEngineValidation(StorageEngine storageEngine) {
+    assertTrue(storageEngine instanceof KeyValueStorageEngine);
+    KeyValueStorageEngine<?, ?> keyValueStorageEngine = 
(KeyValueStorageEngine<?, ?>) storageEngine;
+    assertEquals(this.rawKeyValueStore, keyValueStorageEngine.getRawStore());
+    return keyValueStorageEngine;
+  }
+
+  private static void assertStoreProperties(StoreProperties storeProperties, 
boolean expectedPersistedToDisk,
+      boolean expectedLoggedStore) {
+    assertEquals(expectedPersistedToDisk, storeProperties.isPersistedToDisk());
+    assertEquals(expectedLoggedStore, storeProperties.isLoggedStore());
+  }
+
+  /**
+   * @param changelogSSP if non-null, then enables logged store
+   */
+  private StorageEngine callGetStorageEngine(Config config, 
SystemStreamPartition changelogSSP) {
+    when(this.jobContext.getConfig()).thenReturn(config);
+    return new 
MockKeyValueStorageEngineFactory(this.rawKeyValueStore).getStorageEngine(STORE_NAME,
 this.storeDir,
+        this.keySerde, this.msgSerde, this.changelogCollector, 
this.metricsRegistry, changelogSSP, this.jobContext,

Review comment:
       Is there a need to add a unit test if keySerde or msgSerde is missing?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to