sijie closed pull request #1505: [table service] replaying TxnRequest is not 
implemented
URL: https://github.com/apache/bookkeeper/pull/1505
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
index a64dbfe4e..1ab5da6e0 100644
--- 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
+++ 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
@@ -325,6 +325,10 @@ public void onSuccess(LogRecordWithDLSN record) {
                         record, record.getDlsn(), name());
                 }
                 try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Applying command transaction {} - record {} 
@ {} to mvcc store {}",
+                            record.getTransactionId(), record, 
record.getDlsn(), name());
+                    }
                     commandProcessor.applyCommand(record.getTransactionId(), 
record.getPayloadBuf(), localStore);
 
                     if (record.getDlsn().compareTo(endDLSN) >= 0) {
@@ -334,9 +338,15 @@ public void onSuccess(LogRecordWithDLSN record) {
                         return;
                     }
 
+                    if (log.isDebugEnabled()) {
+                        log.debug("Read next record after {} at mvcc store {}",
+                            record.getDlsn(), name());
+                    }
                     // read next record
                     replayJournal(reader, endDLSN, future);
-                } catch (StateStoreRuntimeException e) {
+                } catch (Exception e) {
+                    log.error("Exception is thrown when applying command 
record {} @ {} to mvcc store {}",
+                        record, record.getDlsn(), name());
                     FutureUtils.completeExceptionally(future, e);
                 }
             }
diff --git 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCCommandProcessor.java
 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCCommandProcessor.java
index d4bb484b5..dc3e8059d 100644
--- 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCCommandProcessor.java
+++ 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCCommandProcessor.java
@@ -25,15 +25,18 @@
 import org.apache.bookkeeper.api.kv.op.DeleteOp;
 import org.apache.bookkeeper.api.kv.op.IncrementOp;
 import org.apache.bookkeeper.api.kv.op.PutOp;
+import org.apache.bookkeeper.api.kv.op.TxnOp;
 import org.apache.bookkeeper.api.kv.result.Code;
 import org.apache.bookkeeper.api.kv.result.DeleteResult;
 import org.apache.bookkeeper.api.kv.result.IncrementResult;
 import org.apache.bookkeeper.api.kv.result.PutResult;
+import org.apache.bookkeeper.api.kv.result.TxnResult;
 import org.apache.bookkeeper.statelib.api.exceptions.MVCCStoreException;
 import org.apache.bookkeeper.statelib.impl.journal.CommandProcessor;
 import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoDeleteOpImpl;
 import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoIncrementOpImpl;
 import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoPutOpImpl;
+import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoTxnOpImpl;
 import org.apache.bookkeeper.stream.proto.kv.store.Command;
 
 @Slf4j
@@ -89,7 +92,28 @@ private void applyDeleteOp(long revision,
 
     private void applyTxnCommand(long revision, Command command,
                                  MVCCStoreImpl<byte[], byte[]> store) {
-        throw new UnsupportedOperationException();
+        try (ProtoTxnOpImpl op = ProtoTxnOpImpl.newTxnOp(command.getTxnReq())) 
{
+            applyTxnOp(revision, op, true, store);
+        }
+    }
+
+    private void applyTxnOp(long revision,
+                            TxnOp<byte[], byte[]> op,
+                            boolean ignoreSmallerRevision,
+                            MVCCStoreImpl<byte[], byte[]> localStore) {
+        try (TxnResult<byte[], byte[]> result = 
localStore.processTxn(revision, op)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Result after applying transaction {} : {} - success 
= {}",
+                    revision, result.code(), result.isSuccess());
+            }
+            if (Code.OK == result.code()
+                || (ignoreSmallerRevision && Code.SMALLER_REVISION == 
result.code())) {
+                return;
+            }
+            throw new MVCCStoreException(result.code(),
+                "Failed to apply command " + op + " at revision "
+                    + revision + " to the state store " + localStore.name());
+        }
     }
 
     private void applyIncrCommand(long revision, Command command,
diff --git 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCUtils.java
 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCUtils.java
index 8bc5965a7..ac96a9289 100644
--- 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCUtils.java
+++ 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCUtils.java
@@ -45,6 +45,9 @@
 import org.apache.bookkeeper.statelib.api.exceptions.MVCCStoreException;
 import 
org.apache.bookkeeper.statelib.api.exceptions.StateStoreRuntimeException;
 import org.apache.bookkeeper.statelib.impl.Constants;
+import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoDeleteOpImpl;
+import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoPutOpImpl;
+import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoRangeOpImpl;
 import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
@@ -144,6 +147,20 @@ static RangeRequest toRangeRequest(RangeOp<byte[], byte[]> 
op) {
         return requestOps;
     }
 
+    public static Op<byte[], byte[]> toApiOp(RequestOp protoOp) {
+        switch (protoOp.getRequestCase()) {
+            case REQUEST_PUT:
+                return ProtoPutOpImpl.newPutOp(protoOp.getRequestPut());
+            case REQUEST_RANGE:
+                return ProtoRangeOpImpl.newRangeOp(protoOp.getRequestRange());
+            case REQUEST_DELETE_RANGE:
+                return 
ProtoDeleteOpImpl.newDeleteOp(protoOp.getRequestDeleteRange());
+            default:
+                throw new IllegalArgumentException("Unknown request "
+                    + protoOp.getRequestCase() + " found in a txn request");
+        }
+    }
+
     private static List<Compare> toCompareList(List<CompareOp<byte[], byte[]>> 
ops) {
         List<Compare> compares = 
Lists.newArrayListWithExpectedSize(ops.size());
         for (CompareOp<byte[], byte[]> op : ops) {
@@ -193,6 +210,21 @@ private static Compare toCompare(CompareOp<byte[], byte[]> 
op) {
         }
     }
 
+    public static CompareTarget toApiCompareTarget(Compare.CompareTarget 
target) {
+        switch (target) {
+            case MOD:
+                return CompareTarget.MOD;
+            case CREATE:
+                return CompareTarget.CREATE;
+            case VERSION:
+                return CompareTarget.VERSION;
+            case VALUE:
+                return CompareTarget.VALUE;
+            default:
+                throw new IllegalArgumentException("Invalid proto compare 
target " + target);
+        }
+    }
+
     private static Compare.CompareResult toProtoCompareResult(CompareResult 
result) {
         switch (result) {
             case LESS:
@@ -208,6 +240,21 @@ private static Compare toCompare(CompareOp<byte[], byte[]> 
op) {
         }
     }
 
+    public static CompareResult toApiCompareResult(Compare.CompareResult 
result) {
+        switch (result) {
+            case LESS:
+                return CompareResult.LESS;
+            case EQUAL:
+                return CompareResult.EQUAL;
+            case GREATER:
+                return CompareResult.GREATER;
+            case NOT_EQUAL:
+                return CompareResult.NOT_EQUAL;
+            default:
+                throw new IllegalArgumentException("Invalid proto compare 
result " + result);
+        }
+    }
+
     static TxnRequest toTxnRequest(TxnOp<byte[], byte[]> op) {
         return TxnRequest.newBuilder()
             .addAllSuccess(toRequestOpList(op.successOps()))
diff --git 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImpl.java
 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImpl.java
new file mode 100644
index 000000000..569f9d19b
--- /dev/null
+++ 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImpl.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.mvcc.op.proto;
+
+import static org.apache.bookkeeper.statelib.impl.Constants.INVALID_REVISION;
+import static 
org.apache.bookkeeper.statelib.impl.mvcc.MVCCUtils.toApiCompareResult;
+import static 
org.apache.bookkeeper.statelib.impl.mvcc.MVCCUtils.toApiCompareTarget;
+
+import com.google.protobuf.ByteString;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.bookkeeper.api.kv.op.CompareOp;
+import org.apache.bookkeeper.api.kv.op.CompareResult;
+import org.apache.bookkeeper.api.kv.op.CompareTarget;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
+
+/**
+ * A protobuf encoded compare operation.
+ */
+@RequiredArgsConstructor
+@Setter(AccessLevel.PRIVATE)
+@ToString(exclude = "recyclerHandle")
+public class ProtoCompareImpl implements CompareOp<byte[], byte[]> {
+
+    /**
+     * Create a protobuf encoded compare operation.
+     *
+     * @param protoCompare the protobuf representation of a compare operation.
+     * @return a protobuf encoded compare operation
+     */
+    public static ProtoCompareImpl newCompareOp(Compare protoCompare) {
+        ProtoCompareImpl op = RECYCLER.get();
+        op.setRequest(protoCompare);
+        op.setTarget(toApiCompareTarget(protoCompare.getTarget()));
+        op.setResult(toApiCompareResult(protoCompare.getResult()));
+        return op;
+    }
+
+    private static final Recycler<ProtoCompareImpl> RECYCLER = new 
Recycler<ProtoCompareImpl>() {
+        @Override
+        protected ProtoCompareImpl newObject(Handle<ProtoCompareImpl> handle) {
+            return new ProtoCompareImpl(handle);
+        }
+    };
+
+    private final Handle<ProtoCompareImpl> recyclerHandle;
+    private Compare request;
+    private CompareTarget target;
+    private CompareResult result;
+    private byte[] key;
+    private byte[] value;
+
+    private void reset() {
+        request = null;
+        key = null;
+        value = null;
+        target = null;
+        result = null;
+    }
+
+    @Override
+    public CompareTarget target() {
+        return target;
+    }
+
+    @Override
+    public CompareResult result() {
+        return result;
+    }
+
+    @Override
+    public byte[] key() {
+        if (null != key) {
+            return key;
+        }
+        if (ByteString.EMPTY == request.getKey()) {
+            key = null;
+        } else {
+            key = request.getKey().toByteArray();
+        }
+        return key;
+    }
+
+    @Override
+    public byte[] value() {
+        if (null != value) {
+            return value;
+        }
+        if (ByteString.EMPTY == request.getValue()) {
+            value = null;
+        } else {
+            value = request.getValue().toByteArray();
+        }
+        return value;
+    }
+
+    @Override
+    public long revision() {
+        Compare req = request;
+        if (null == req) {
+            return INVALID_REVISION;
+        } else {
+            switch (req.getTarget()) {
+                case MOD:
+                    return req.getModRevision();
+                case CREATE:
+                    return req.getCreateRevision();
+                case VERSION:
+                    return req.getVersion();
+                default:
+                    return INVALID_REVISION;
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        reset();
+        recyclerHandle.recycle(this);
+    }
+}
diff --git 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoPutOpImpl.java
 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoPutOpImpl.java
index 8ae2b88c0..cdde2458b 100644
--- 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoPutOpImpl.java
+++ 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoPutOpImpl.java
@@ -42,6 +42,12 @@ public static ProtoPutOpImpl newPutOp(Command command) {
         return op;
     }
 
+    public static ProtoPutOpImpl newPutOp(PutRequest req) {
+        ProtoPutOpImpl op = RECYCLER.get();
+        op.setPutRequest(req);
+        return op;
+    }
+
     private static final Recycler<ProtoPutOpImpl> RECYCLER = new 
Recycler<ProtoPutOpImpl>() {
         @Override
         protected ProtoPutOpImpl newObject(Handle<ProtoPutOpImpl> handle) {
@@ -73,6 +79,10 @@ public void setCommand(Command command) {
         this.req = command.getPutReq();
     }
 
+    public void setPutRequest(PutRequest request) {
+        this.req = request;
+    }
+
     @Override
     public boolean prevKv() {
         return req.getPrevKv();
diff --git 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoTxnOpImpl.java
 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoTxnOpImpl.java
new file mode 100644
index 000000000..37f675c4e
--- /dev/null
+++ 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoTxnOpImpl.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.mvcc.op.proto;
+
+import static org.apache.bookkeeper.statelib.impl.mvcc.MVCCUtils.toApiOp;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import java.util.List;
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.bookkeeper.api.kv.op.CompareOp;
+import org.apache.bookkeeper.api.kv.op.Op;
+import org.apache.bookkeeper.api.kv.op.OpType;
+import org.apache.bookkeeper.api.kv.op.TxnOp;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RequestOp;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+
+/**
+ * A protobuf encoded transaction operation.
+ */
+@RequiredArgsConstructor
+@ToString(exclude = "recyclerHandle")
+@Setter(AccessLevel.PRIVATE)
+public class ProtoTxnOpImpl implements TxnOp<byte[], byte[]> {
+
+    public static ProtoTxnOpImpl newTxnOp(TxnRequest request) {
+        ProtoTxnOpImpl op = RECYCLER.get();
+        op.setRequest(request);
+        RecyclableArrayList<CompareOp<byte[], byte[]>> compareOps = 
COMPARE_OPS_RECYCLER.newInstance();
+        for (Compare compare : request.getCompareList()) {
+            compareOps.add(ProtoCompareImpl.newCompareOp(compare));
+        }
+        op.setCompareOps(compareOps);
+        RecyclableArrayList<Op<byte[], byte[]>> successOps = 
OPS_RECYCLER.newInstance();
+        for (RequestOp reqOp : request.getSuccessList()) {
+            successOps.add(toApiOp(reqOp));
+        }
+        op.setSuccessOps(successOps);
+        RecyclableArrayList<Op<byte[], byte[]>> failureOps = 
OPS_RECYCLER.newInstance();
+        for (RequestOp reqOp : request.getFailureList()) {
+            failureOps.add(toApiOp(reqOp));
+        }
+        return op;
+    }
+
+    private static final Recycler<ProtoTxnOpImpl> RECYCLER = new 
Recycler<ProtoTxnOpImpl>() {
+        @Override
+        protected ProtoTxnOpImpl newObject(Handle<ProtoTxnOpImpl> handle) {
+            return new ProtoTxnOpImpl(handle);
+        }
+    };
+
+    private static final RecyclableArrayList.Recycler<CompareOp<byte[], 
byte[]>> COMPARE_OPS_RECYCLER =
+        new RecyclableArrayList.Recycler<>();
+    private static final RecyclableArrayList.Recycler<Op<byte[], byte[]>> 
OPS_RECYCLER =
+        new RecyclableArrayList.Recycler<>();
+
+    private final Handle<ProtoTxnOpImpl> recyclerHandle;
+    private TxnRequest request;
+    private RecyclableArrayList<CompareOp<byte[], byte[]>> compareOps;
+    private RecyclableArrayList<Op<byte[], byte[]>> successOps;
+    private RecyclableArrayList<Op<byte[], byte[]>> failureOps;
+
+    private void reset() {
+        request = null;
+        if (null != compareOps) {
+            compareOps.forEach(CompareOp::close);
+            compareOps.recycle();
+        }
+        if (null != successOps) {
+            successOps.forEach(Op::close);
+            successOps.recycle();
+        }
+        if (null != failureOps) {
+            failureOps.forEach(Op::close);
+            failureOps.recycle();
+        }
+    }
+
+    @Override
+    public List<CompareOp<byte[], byte[]>> compareOps() {
+        return compareOps;
+    }
+
+    @Override
+    public List<Op<byte[], byte[]>> successOps() {
+        return successOps;
+    }
+
+    @Override
+    public List<Op<byte[], byte[]>> failureOps() {
+        return failureOps;
+    }
+
+    @Override
+    public OpType type() {
+        return OpType.TXN;
+    }
+
+    @Override
+    public void close() {
+        reset();
+        recyclerHandle.recycle(this);
+    }
+}
diff --git 
a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCAsyncBytesStoreImpl.java
 
b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCAsyncBytesStoreImpl.java
index 610d5d2c3..5314ed8a0 100644
--- 
a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCAsyncBytesStoreImpl.java
+++ 
b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCAsyncBytesStoreImpl.java
@@ -29,11 +29,11 @@
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
-import com.google.common.io.Files;
 import java.io.File;
 import java.net.URI;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.kv.op.PutOp;
@@ -45,7 +45,6 @@
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.statelib.api.StateStoreSpec;
 import org.apache.bookkeeper.statelib.api.exceptions.MVCCStoreException;
-import org.apache.commons.io.FileUtils;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -54,7 +53,9 @@
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /**
  * Unit test of {@link MVCCAsyncBytesStoreImpl}.
@@ -62,6 +63,9 @@
 @Slf4j
 public class TestMVCCAsyncBytesStoreImpl extends TestDistributedLogBase {
 
+    @Rule
+    public final TemporaryFolder testDir = new TemporaryFolder();
+
     private static URI uri;
     private static Namespace namespace;
 
@@ -96,7 +100,7 @@ public void setup() throws Exception {
         super.setup();
         ensureURICreated(uri);
 
-        tempDir = Files.createTempDir();
+        tempDir = testDir.newFolder();
 
         store = new MVCCAsyncBytesStoreImpl(
             () -> new MVCCStoreImpl<>(),
@@ -123,9 +127,6 @@ public void teardown() throws Exception {
         if (null != store) {
             store.close();
         }
-        if (null != tempDir) {
-            FileUtils.deleteDirectory(tempDir);
-        }
         super.teardown();
     }
 
@@ -423,4 +424,48 @@ private void verifyRecords(List<KeyValue<byte[], byte[]>> 
kvs,
         assertEquals(endKey + 1, idx);
     }
 
+    @Test
+    public void testReplayJournal() throws Exception {
+        this.streamName = "test-replay-journal";
+        StateStoreSpec spec = initSpec(streamName);
+        result(store.init(spec));
+
+        int numKvs = 10;
+
+        // putIfAbsent
+        IntStream.range(0, numKvs)
+            .forEach(i -> {
+                try {
+                    result(store.putIfAbsent(getKey(i), getValue(100 + i)));
+                } catch (Exception e) {
+                    log.error("Failed to put kv pair ({})", i, e);
+                }
+            });
+
+        log.info("Closing the store '{}' ...", streamName);
+        // close the store
+        store.close();
+        log.info("Closed the store '{}' ...", streamName);
+
+        // open the store again to replay the journal.
+        store = new MVCCAsyncBytesStoreImpl(
+            () -> new MVCCStoreImpl<>(),
+            () -> namespace);
+        spec = StateStoreSpec.builder()
+            .name(streamName)
+            .keyCoder(ByteArrayCoder.of())
+            .valCoder(ByteArrayCoder.of())
+            .stream(streamName)
+            .localStateStoreDir(testDir.newFolder())
+            .build();
+        result(store.init(spec));
+
+        // verify the key/value pairs
+        for (int i = 0; i < numKvs; i++) {
+            byte[] value = result(store.get(getKey(i)));
+            assertNotNull(value);
+            assertArrayEquals(getValue(100 + i), value);
+        }
+    }
+
 }
diff --git 
a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImplTest.java
 
b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImplTest.java
new file mode 100644
index 000000000..6e50bafac
--- /dev/null
+++ 
b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImplTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.mvcc.op.proto;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.protobuf.ByteString;
+import lombok.Cleanup;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare.CompareResult;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare.CompareTarget;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ProtoCompareImpl}.
+ */
+public class ProtoCompareImplTest {
+
+    private static final ByteString KEY = ByteString.copyFromUtf8("test-key");
+    private static final ByteString VAL = 
ByteString.copyFromUtf8("test-value");
+    private static final long MOD_REV = System.currentTimeMillis();
+    private static final long CREATE_REV = MOD_REV + 1;
+    private static final long VERSION = CREATE_REV + 1;
+
+    @Test
+    public void testCompareEmptyValue() {
+        Compare compare = Compare.newBuilder()
+            .setKey(KEY)
+            .setResult(CompareResult.EQUAL)
+            .setTarget(CompareTarget.VALUE)
+            .build();
+
+        @Cleanup ProtoCompareImpl protoCompare = 
ProtoCompareImpl.newCompareOp(compare);
+        assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+        assertNull(protoCompare.value());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, 
protoCompare.result());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.VALUE, 
protoCompare.target());
+    }
+
+    @Test
+    public void testCompareValue() {
+        Compare compare = Compare.newBuilder()
+            .setKey(KEY)
+            .setValue(VAL)
+            .setResult(CompareResult.EQUAL)
+            .setTarget(CompareTarget.VALUE)
+            .build();
+
+        @Cleanup ProtoCompareImpl protoCompare = 
ProtoCompareImpl.newCompareOp(compare);
+        assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+        assertArrayEquals("test-value".getBytes(UTF_8), protoCompare.value());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, 
protoCompare.result());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.VALUE, 
protoCompare.target());
+    }
+
+    @Test
+    public void testCompareMod() {
+        Compare compare = Compare.newBuilder()
+            .setKey(KEY)
+            .setModRevision(MOD_REV)
+            .setResult(CompareResult.EQUAL)
+            .setTarget(CompareTarget.MOD)
+            .build();
+
+        @Cleanup ProtoCompareImpl protoCompare = 
ProtoCompareImpl.newCompareOp(compare);
+        assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+        assertEquals(MOD_REV, protoCompare.revision());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, 
protoCompare.result());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.MOD, 
protoCompare.target());
+    }
+
+    @Test
+    public void testCompareCreate() {
+        Compare compare = Compare.newBuilder()
+            .setKey(KEY)
+            .setCreateRevision(CREATE_REV)
+            .setResult(CompareResult.EQUAL)
+            .setTarget(CompareTarget.CREATE)
+            .build();
+
+        @Cleanup ProtoCompareImpl protoCompare = 
ProtoCompareImpl.newCompareOp(compare);
+        assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+        assertEquals(CREATE_REV, protoCompare.revision());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, 
protoCompare.result());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.CREATE, 
protoCompare.target());
+    }
+
+    @Test
+    public void testCompareVersion() {
+        Compare compare = Compare.newBuilder()
+            .setKey(KEY)
+            .setVersion(VERSION)
+            .setResult(CompareResult.EQUAL)
+            .setTarget(CompareTarget.VERSION)
+            .build();
+
+        @Cleanup ProtoCompareImpl protoCompare = 
ProtoCompareImpl.newCompareOp(compare);
+        assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+        assertEquals(VERSION, protoCompare.revision());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, 
protoCompare.result());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.VERSION, 
protoCompare.target());
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to