This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9e8dcbe SAMZA-2643: Cleanup redundant null serde checks in
ContainerStorageManager, these checks should only be in
BaseKeyValueStorageEngineFactory (#1486)
9e8dcbe is described below
commit 9e8dcbe787b2d331e731cdeb263f1d159959a925
Author: Sanil Jain <[email protected]>
AuthorDate: Tue Mar 30 18:34:28 2021 -0700
SAMZA-2643: Cleanup redundant null serde checks in ContainerStorageManager,
these checks should only be in BaseKeyValueStorageEngineFactory (#1486)
Feature/Issue:
BaseKeyValueStorageEngineFactory assumes non-null serdes for k,v of stores
using it and applies check/ ContainerStorageManager also applies redundant
check to verify the same, however, this might not hold true for some
StorageEngine use cases, since some stores may elect to not expose serdes but
just use Java Objects as K, V
Changes:
Remove the redundant check from ContainerStorageManager that assumes each
store should have non-null serdes
Add docs to BaseKeyValueStorageEngineFactory on serde assumption
Add more docs on assumptions for serdes and existence between wrappedStore
& rawStore in KeyValueStorageEngine API
---
.../samza/storage/ContainerStorageManager.java | 23 ++++++----------------
.../kv/BaseKeyValueStorageEngineFactory.java | 2 ++
.../samza/storage/kv/KeyValueStorageEngine.scala | 3 +++
3 files changed, 11 insertions(+), 17 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 412a3c1..c333748 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -490,25 +490,14 @@ public class ContainerStorageManager {
this.storeDirectoryPaths.add(storeDirectory.toPath());
Optional<String> storageKeySerde =
storageConfig.getStorageKeySerde(storeName);
- if (!storageKeySerde.isPresent()) {
- throw new SamzaException("No key serde defined for store: " + storeName);
+ Serde keySerde = null;
+ if (storageKeySerde.isPresent()) {
+ keySerde = serdes.get(storageKeySerde.get());
}
-
- Serde keySerde = serdes.get(storageKeySerde.get());
- if (keySerde == null) {
- throw new SamzaException(
- "StorageKeySerde: No class defined for serde: " +
storageKeySerde.get());
- }
-
Optional<String> storageMsgSerde =
storageConfig.getStorageMsgSerde(storeName);
- if (!storageMsgSerde.isPresent()) {
- throw new SamzaException("No msg serde defined for store: " + storeName);
- }
-
- Serde messageSerde = serdes.get(storageMsgSerde.get());
- if (messageSerde == null) {
- throw new SamzaException(
- "StorageMsgSerde: No class defined for serde: " +
storageMsgSerde.get());
+ Serde messageSerde = null;
+ if (storageMsgSerde.isPresent()) {
+ messageSerde = serdes.get(storageMsgSerde.get());
}
// if taskInstanceMetrics are specified use those for store metrics,
diff --git
a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
index a3fc178..d6c1196 100644
---
a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
+++
b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.java
@@ -42,6 +42,8 @@ import org.apache.samza.util.ScalaJavaUtil;
* This encapsulates all the steps needed to create a key value storage engine.
* This is meant to be extended by the specific key value store factory
implementations which will in turn override the
* getKVStore method to return a raw key-value store.
+ *
+ * BaseKeyValueStorageEngineFactory assumes non null keySerde and msgSerde
*/
public abstract class BaseKeyValueStorageEngineFactory<K, V> implements
StorageEngineFactory<K, V> {
private static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
diff --git
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index fc6cb53..baa43d1 100644
---
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -36,6 +36,9 @@ import org.apache.samza.checkpoint.CheckpointId
* A key value store.
*
* This implements both the key/value interface and the storage engine
interface.
+ *
+ * There should be no implicit assumption that wrappedStore is a logical
extension or a decorated view of rawStore
+ * Either can exist independently
*/
class KeyValueStorageEngine[K, V](
storeName: String,