This is an automated email from the ASF dual-hosted git repository.
rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 3421d527b [STORM-1316] port storm.trident.state-test to java (#3240)
3421d527b is described below
commit 3421d527b6ed59075b3ba0812ab7a8e1974a5873
Author: nd368 <[email protected]>
AuthorDate: Mon Oct 23 19:21:11 2023 +0100
[STORM-1316] port storm.trident.state-test to java (#3240)
* [STORM-1316] port storm.trident.state-test to java
* Adress review comments + fix Mockito upgrade related things
---------
Co-authored-by: Richard Zowalla <[email protected]>
---
.../storm/trident/testing/MemoryBackingMap.java | 6 +-
.../clj/org/apache/storm/trident/state_test.clj | 148 -----------------
.../jvm/org/apache/storm/trident/StateTest.java | 175 +++++++++++++++++++++
3 files changed, 178 insertions(+), 151 deletions(-)
diff --git
a/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryBackingMap.java
b/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryBackingMap.java
index 5dc43f51d..f9ebce0d4 100644
---
a/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryBackingMap.java
+++
b/storm-client/src/jvm/org/apache/storm/trident/testing/MemoryBackingMap.java
@@ -18,11 +18,11 @@ import java.util.List;
import java.util.Map;
import org.apache.storm.trident.state.map.IBackingMap;
-public class MemoryBackingMap implements IBackingMap<Object> {
+public class MemoryBackingMap<T> implements IBackingMap<T> {
Map vals = new HashMap();
@Override
- public List<Object> multiGet(List<List<Object>> keys) {
+ public List<T> multiGet(List<List<Object>> keys) {
List ret = new ArrayList();
for (List key : keys) {
ret.add(vals.get(key));
@@ -31,7 +31,7 @@ public class MemoryBackingMap implements IBackingMap<Object> {
}
@Override
- public void multiPut(List<List<Object>> keys, List<Object> vals) {
+ public void multiPut(List<List<Object>> keys, List<T> vals) {
for (int i = 0; i < keys.size(); i++) {
List key = keys.get(i);
Object val = vals.get(i);
diff --git a/storm-core/test/clj/org/apache/storm/trident/state_test.clj
b/storm-core/test/clj/org/apache/storm/trident/state_test.clj
deleted file mode 100644
index d42034e13..000000000
--- a/storm-core/test/clj/org/apache/storm/trident/state_test.clj
+++ /dev/null
@@ -1,148 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.trident.state-test
- (:use [clojure test])
- (:import [org.apache.storm.trident.operation.builtin Count])
- (:import [org.apache.storm.trident.state OpaqueValue])
- (:import [org.apache.storm.trident.state CombinerValueUpdater])
- (:import [org.apache.storm.trident.topology.state TransactionalState
TestTransactionalState])
- (:import [org.apache.storm.trident.state.map TransactionalMap OpaqueMap])
- (:import [org.apache.storm.trident.testing MemoryBackingMap MemoryMapState])
- (:import [org.apache.storm.utils ZookeeperAuthInfo Utils])
- (:import [org.apache.storm.shade.org.apache.curator.framework
CuratorFramework])
- (:import [org.apache.storm.shade.org.apache.curator.framework.api
CreateBuilder ProtectACLCreateModeStatPathAndBytesable])
- (:import [org.apache.zookeeper CreateMode ZooDefs ZooDefs$Ids])
- (:import [org.mockito ArgumentMatchers Mockito])
- (:import [org.mockito.exceptions.base MockitoAssertionError])
- (:use [org.apache.storm config]))
-
-(defn single-remove [map key]
- (-> map (.multiRemove [[key]])))
-
-(defn single-put [map key val]
- (-> map (.multiPut [[key]] [val])))
-
-(defn single-get [map key]
- (-> map (.multiGet [[key]]) first))
-
-(defn single-update [map key amt]
- (-> map (.multiUpdate [[key]] [(CombinerValueUpdater. (Count.) amt)]) first))
-
-(deftest test-opaque-value
- (let [opqval (OpaqueValue. 8 "v1" "v0")
- upval0 (.update opqval 8 "v2")
- upval1 (.update opqval 9 "v2")
- ]
- (is (= "v1" (.get opqval nil)))
- (is (= "v1" (.get opqval 100)))
- (is (= "v1" (.get opqval 9)))
- (is (= "v0" (.get opqval 8)))
- (let [has-exception (try
- (.get opqval 7) false
- (catch Exception e true))]
- (is (= true has-exception)))
- (is (= "v0" (.getPrev opqval)))
- (is (= "v1" (.getCurr opqval)))
- ;; update with current
- (is (= "v0" (.getPrev upval0)))
- (is (= "v2" (.getCurr upval0)))
- (not (identical? opqval upval0))
- ;; update
- (is (= "v1" (.getPrev upval1)))
- (is (= "v2" (.getCurr upval1)))
- (not (identical? opqval upval1))
- ))
-
-(deftest test-opaque-map
- (let [map (OpaqueMap/build (MemoryBackingMap.))]
- (.beginCommit map 1)
- (is (= nil (single-get map "a")))
- ;; tests that intra-batch caching works
- (is (= 1 (single-update map "a" 1)))
- (is (= 1 (single-get map "a")))
- (is (= 3 (single-update map "a" 2)))
- (is (= 3 (single-get map "a")))
- (.commit map 1)
- (.beginCommit map 1)
- (is (= nil (single-get map "a")))
- (is (= 2 (single-update map "a" 2)))
- (.commit map 1)
- (.beginCommit map 2)
- (is (= 2 (single-get map "a")))
- (is (= 5 (single-update map "a" 3)))
- (is (= 6 (single-update map "a" 1)))
- (.commit map 2)
- ))
-
-(deftest test-transactional-map
- (let [map (TransactionalMap/build (MemoryBackingMap.))]
- (.beginCommit map 1)
- (is (= nil (single-get map "a")))
- ;; tests that intra-batch caching works
- (is (= 1 (single-update map "a" 1)))
- (is (= 3 (single-update map "a" 2)))
- (.commit map 1)
- (.beginCommit map 1)
- (is (= 3 (single-get map "a")))
- ;; tests that intra-batch caching has no effect if it's the same commit as
previous commit
- (is (= 3 (single-update map "a" 1)))
- (is (= 3 (single-update map "a" 2)))
- (.commit map 1)
- (.beginCommit map 2)
- (is (= 3 (single-get map "a")))
- (is (= 6 (single-update map "a" 3)))
- (is (= 7 (single-update map "a" 1)))
- (.commit map 2)
- ))
-
-(deftest test-create-node-acl
- (testing "Creates ZooKeeper nodes with the correct ACLs"
- (let [curator (Mockito/mock CuratorFramework)
- builder0 (Mockito/mock CreateBuilder)
- builder1 (Mockito/mock ProtectACLCreateModeStatPathAndBytesable)
- expectedAcls ZooDefs$Ids/CREATOR_ALL_ACL]
- (. (Mockito/when (.create curator)) (thenReturn builder0))
- (. (Mockito/when (.creatingParentsIfNeeded builder0)) (thenReturn
builder1))
- (. (Mockito/when (.withMode builder1 (ArgumentMatchers/isA CreateMode)))
(thenReturn builder1))
- (. (Mockito/when (.withACL builder1 (Mockito/anyList))) (thenReturn
builder1))
- (TestTransactionalState/createNode curator "" (byte-array 0)
expectedAcls nil)
- (is (nil?
- (try
- (. (Mockito/verify builder1) (withACL expectedAcls))
- (catch MockitoAssertionError e
- e)))))))
-
-(deftest test-memory-map-state-remove
- (let [map (MemoryMapState. (Utils/uuid))]
- (.beginCommit map 1)
- (single-put map "a" 1)
- (single-put map "b" 2)
- (.commit map 1)
- (.beginCommit map 2)
- (single-remove map "a")
- (is (nil? (single-get map "a")))
- (is (= 2 (single-get map "b")))
- (.commit map 2)
- (.beginCommit map 2)
- (is (= 1 (single-get map "a")))
- (is (= 2 (single-get map "b")))
- (single-remove map "a")
- (.commit map 2)
- (.beginCommit map 3)
- (is (nil? (single-get map "a")))
- (is (= 2 (single-get map "b")))
- (.commit map 3)
- ))
diff --git a/storm-core/test/jvm/org/apache/storm/trident/StateTest.java
b/storm-core/test/jvm/org/apache/storm/trident/StateTest.java
new file mode 100644
index 000000000..68f615d2a
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/trident/StateTest.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.trident;
+
+import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
+import org.apache.storm.shade.org.apache.curator.framework.api.CreateBuilder;
+import
org.apache.storm.shade.org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable;
+import org.apache.storm.shade.org.apache.zookeeper.CreateMode;
+import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
+import org.apache.storm.shade.org.apache.zookeeper.ZooDefs;
+import org.apache.storm.trident.operation.builtin.Count;
+import org.apache.storm.trident.state.CombinerValueUpdater;
+import org.apache.storm.trident.state.OpaqueValue;
+import org.apache.storm.trident.state.ValueUpdater;
+import org.apache.storm.trident.state.map.MapState;
+import org.apache.storm.trident.state.map.OpaqueMap;
+import org.apache.storm.trident.state.map.TransactionalMap;
+import org.apache.storm.trident.testing.MemoryBackingMap;
+import org.apache.storm.trident.testing.MemoryMapState;
+import org.apache.storm.trident.topology.state.TestTransactionalState;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class StateTest {
+
+ private void singleRemove(MemoryMapState<Object> map, Object key){
+ List<List<Object>> keys =
Collections.singletonList(Collections.singletonList(key));
+ map.multiRemove(keys);
+ }
+
+ private void singlePut(MemoryMapState<Object> map, Object key, Object val){
+ List<List<Object>> keys =
Collections.singletonList(Collections.singletonList(key));
+ List<Object> vals = Collections.singletonList(val);
+ map.multiPut(keys, vals);
+ }
+
+ private Object singleGet(MapState<Object> map, Object key){
+ List<List<Object>> keys =
Collections.singletonList(Collections.singletonList(key));
+ return map.multiGet(keys).get(0);
+ }
+
+ private Object singleUpdate(MapState<Object> map, Object key, Long amt){
+ List<List<Object>> keys =
Collections.singletonList(Collections.singletonList(key));
+ CombinerValueUpdater valueUpdater = new CombinerValueUpdater(new
Count(), amt);
+ List<ValueUpdater> updaters = Collections.singletonList(valueUpdater);
+ return map.multiUpdate(keys, updaters).get(0);
+ }
+
+ @Test
+ public void testOpaqueValue() {
+ OpaqueValue<String> opqval = new OpaqueValue<>(8L, "v1", "v0");
+ OpaqueValue<String> upval0 = opqval.update(8L, "v2");
+ OpaqueValue<String> upval1 = opqval.update(9L, "v2");
+ assertEquals(opqval.get(null), "v1");
+ assertEquals(opqval.get(100L), "v1");
+ assertEquals(opqval.get(9L), "v1");
+ assertEquals(opqval.get(8L), "v0");
+ Assertions.assertThrows(Exception.class, () -> opqval.get(7L));
+ assertEquals(opqval.getPrev(), "v0");
+ assertEquals(opqval.getCurr(), "v1");
+ // update with current
+ assertEquals(upval0.getPrev(), "v0");
+ assertEquals(upval0.getCurr(), "v2");
+ assertNotEquals(opqval, upval0);
+ // update
+ assertEquals(upval1.getPrev(), "v1");
+ assertEquals(upval1.getCurr(), "v2");
+ assertNotEquals(opqval, upval1);
+ }
+
+ @Test
+ public void testOpaqueMap() {
+ MapState<Object> map = OpaqueMap.build(new MemoryBackingMap<>());
+ map.beginCommit(1L);
+ assertEquals(singleGet(map, "a"), null);
+ // tests that intra-batch caching works
+ assertEquals(singleUpdate(map, "a", 1L), 1L);
+ assertEquals(singleGet(map, "a"), 1L);
+ assertEquals(singleUpdate(map, "a", 2L), 3L);
+ assertEquals(singleGet(map, "a"), 3L);
+ map.commit(1L);
+ map.beginCommit(1L);
+ assertEquals(singleGet(map, "a"), null);
+ assertEquals(singleUpdate(map, "a", 2L), 2L);
+ map.commit(1L);
+ map.beginCommit(2L);
+ assertEquals(singleGet(map, "a"), 2L);
+ assertEquals(singleUpdate(map, "a", 3L), 5L);
+ assertEquals(singleUpdate(map, "a", 1L), 6L);
+ map.commit(2L);
+ }
+
+ @Test
+ public void testTransactionalMap() {
+ MapState<Object> map = TransactionalMap.build(new
MemoryBackingMap<>());
+ map.beginCommit(1L);
+ assertEquals(singleGet(map, "a"), null);
+ // tests that intra-batch caching works
+ assertEquals(singleUpdate(map, "a", 1L), 1L);
+ assertEquals(singleUpdate(map, "a", 2L), 3L);
+ map.commit(1L);
+ map.beginCommit(1L);
+ assertEquals(singleGet(map, "a"), 3L);
+ // tests that intra-batch caching has no effect if it's the same
commit as previous commit
+ assertEquals(singleUpdate(map, "a", 1L), 3L);
+ assertEquals(singleUpdate(map, "a", 2L), 3L);
+ map.commit(1L);
+ map.beginCommit(2L);
+ assertEquals(singleGet(map, "a"), 3L);
+ assertEquals(singleUpdate(map, "a", 3L), 6L);
+ assertEquals(singleUpdate(map, "a", 1L), 7L);
+ map.commit(2L);
+ }
+
+ @Test
+ public void testCreateNodeAcl() throws Exception {
+ // Creates ZooKeeper nodes with the correct ACLs
+ CuratorFramework curator = Mockito.mock(CuratorFramework.class);
+ CreateBuilder builder0 = Mockito.mock(CreateBuilder.class);
+ ProtectACLCreateModeStatPathAndBytesable builder1 =
Mockito.mock(ProtectACLCreateModeStatPathAndBytesable.class);
+ List<ACL> expectedAcls = ZooDefs.Ids.CREATOR_ALL_ACL;
+ Mockito.when(curator.create()).thenReturn(builder0);
+ Mockito.when(builder0.creatingParentsIfNeeded()).thenReturn(builder1);
+
Mockito.when(builder1.withMode(ArgumentMatchers.isA(CreateMode.class))).thenReturn(builder1);
+ Mockito.when(builder1.withACL(Mockito.anyList())).thenReturn(builder1);
+ TestTransactionalState.createNode(curator, "", new byte[0],
expectedAcls, null);
+ Mockito.verify(builder1).withACL(expectedAcls);
+ }
+
+ @Test
+ public void testMemoryMapStateRemove() {
+ MemoryMapState<Object> map = new MemoryMapState<>(Utils.uuid());
+ map.beginCommit(1L);
+ singlePut(map, "a", 1);
+ singlePut(map, "b", 2);
+ map.commit(1L);
+ map.beginCommit(2L);
+ singleRemove(map, "a");
+ assertEquals(singleGet(map, "a"), null);
+ assertEquals(singleGet(map, "b"), 2);
+ map.commit(2L);
+ map.beginCommit(2L);
+ assertEquals(singleGet(map, "a"), 1);
+ assertEquals(singleGet(map, "b"), 2);
+ singleRemove(map, "a");
+ map.commit(2L);
+ map.beginCommit(3L);
+ assertEquals(singleGet(map, "a"), null);
+ assertEquals(singleGet(map, "b"), 2);
+ map.commit(3L);
+ }
+}