Repository: incubator-ratis Updated Branches: refs/heads/master ed8e60dad -> 523fb6383
RATIS-317. Log service state machine Signed-off-by: Josh Elser <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/523fb638 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/523fb638 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/523fb638 Branch: refs/heads/master Commit: 523fb63832e574fac0a3b8cc65f548e91940dba5 Parents: ed8e60d Author: Vladimir Rodionov <[email protected]> Authored: Wed Sep 19 12:45:01 2018 -0700 Committer: Josh Elser <[email protected]> Committed: Thu Sep 20 13:42:44 2018 -0400 ---------------------------------------------------------------------- .../apache/ratis/logservice/api/LogMessage.java | 175 ++++++++++++++++ .../apache/ratis/logservice/api/LogName.java | 8 + .../ratis/logservice/api/LogStateMachine.java | 197 +++++++++++++++++++ .../org/apache/ratis/logservice/util/Utils.java | 48 +++++ .../ratis/logservice/api/TestLogMessage.java | 55 ++++++ .../apache/ratis/logservice/util/TestUtils.java | 95 +++++++++ .../src/main/proto/Logservice.proto | 35 ++++ 7 files changed, 613 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/523fb638/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java new file mode 100644 index 0000000..bf2024f --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogMessage.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.api; + +import java.nio.charset.Charset; + +import org.apache.ratis.protocol.Message; +import org.apache.ratis.shaded.proto.logservice.LogServiceProtos; + +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException; + +public class LogMessage implements Message { + public static final Charset UTF8 = Charset.forName("UTF-8"); + /* + * Type of message + */ + public static enum Type { + READ_REQUEST, READ_REPLY, WRITE + } + + /* + * For all READ and WRITE requests + */ + private final LogName name; + + /* + * Set only for READ_REPLY response + */ + private long length; + + /* + * Pay load for WRITE request + */ + private byte[] data; + + /* + * Type of message + */ + + private Type type; + + /** + * Constructor for WRITE request + * @param logName name of a log + * @param data pay load data + */ + public LogMessage(LogName logName, byte[] data) { + this.name = logName; + this.data = data; + this.type = Type.WRITE; + } + + /** + * Constructor for READ reply + * @param logName name of a log + * @param length length of a log + */ + public LogMessage(LogName logName, long length) { + this.name = logName; + this.length = length; + this.type = Type.READ_REPLY; + } + + /** + * Constructor for READ request + * @param logName name of a log + */ + public LogMessage(LogName logName) { + this.name = logName; + this.type = Type.READ_REQUEST; + } + + public static LogMessage parseFrom(ByteString data) + throws InvalidProtocolBufferException { + LogServiceProtos.LogMessage msg = LogServiceProtos.LogMessage.parseFrom(data); + LogServiceProtos.MessageType type = msg.getType(); + long length = 0; + LogName name = null; + byte[] bdata = null; + name = LogName.of(msg.getLogName()); + switch (type) { + case READ_REPLY: + length = msg.getLength(); + return new LogMessage(name, length); + case READ_REQUEST: + return new LogMessage(name); + case WRITE: + bdata = msg.getData().toByteArray(); + return new LogMessage(name, bdata); + default: + //TODO replace exception + throw new RuntimeException("Wrong message type: "+ type); + } + } + + + /** + * Get log name + * @return log name + */ + public LogName getLogName() { + return name; + } + + /** + * Get log length + * @return log length + */ + public long getLength() { + return length; + } + + /** + * Get log message data + * @return data + */ + public byte[] getData() { + return data; + } + + /** + * Get message type + * @return message type + */ + public Type getType() { + return type; + } + + + @SuppressWarnings("deprecation") + @Override + public ByteString getContent() { + LogServiceProtos.LogMessage.Builder builder = LogServiceProtos.LogMessage.newBuilder(); + builder.setType(LogServiceProtos.MessageType.valueOf(type.ordinal())); + builder.setLogName(name.getName()); + switch (type) { + case READ_REPLY: + builder.setLength(length); + break; + case WRITE: + builder.setData(ByteString.copyFrom(data)); + break; + default: + } + + return builder.build().toByteString(); + } + + @Override + public String toString() { + String s = type.name() + " : log=" + name.getName(); + if (type == Type.READ_REPLY) { + s += " len=" + length; + } else if (type == Type.WRITE) { + s += " data len=" + data.length; + } + return s; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/523fb638/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java index 3227845..bbeb0a5 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogName.java @@ -62,6 +62,14 @@ public class LogName { } /** + * Length of a log's name + * @return length + */ + public int getLength() { + return name.length(); + } + + /** * Creates a {@link LogName} given the provided string. */ public static LogName of(String name) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/523fb638/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java new file mode 100644 index 0000000..3444dd5 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStateMachine.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.api; + +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.shaded.proto.RaftProtos; +import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.statemachine.StateMachineStorage; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; +import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; +import org.apache.ratis.util.AutoCloseableLock; + +import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException; + +import java.io.*; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class LogStateMachine extends BaseStateMachine { + private final Map<LogName, Long> state = new ConcurrentHashMap<>(); + + private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + private AutoCloseableLock readLock() { + return AutoCloseableLock.acquire(lock.readLock()); + } + + private AutoCloseableLock writeLock() { + return AutoCloseableLock.acquire(lock.writeLock()); + } + + void reset() { + state.clear(); + setLastAppliedTermIndex(null); + } + + @Override + public void initialize(RaftServer server, RaftGroupId groupId, + RaftStorage raftStorage) throws IOException { + super.initialize(server, groupId, raftStorage); + this.storage.init(raftStorage); + loadSnapshot(storage.getLatestSnapshot()); + } + + @Override + public void reinitialize() throws IOException { + close(); + loadSnapshot(storage.getLatestSnapshot()); + } + + @Override + public long takeSnapshot() { + final Map<LogName, Long> copy; + final TermIndex last; + try(final AutoCloseableLock readLock = readLock()) { + copy = new HashMap<>(state); + last = getLastAppliedTermIndex(); + } + + final File snapshotFile = storage.getSnapshotFile(last.getTerm(), last.getIndex()); + LOG.info("Taking a snapshot to file {}", snapshotFile); + + try(final ObjectOutputStream out = new ObjectOutputStream( + new BufferedOutputStream(new FileOutputStream(snapshotFile)))) { + out.writeObject(copy); + } catch(IOException ioe) { + LOG.warn("Failed to write snapshot file \"" + snapshotFile + + "\", last applied index=" + last); + } + + return last.getIndex(); + } + + private long loadSnapshot(SingleFileSnapshotInfo snapshot) throws IOException { + return load(snapshot, false); + } + + @SuppressWarnings("unchecked") + private long load(SingleFileSnapshotInfo snapshot, boolean reload) throws IOException { + if (snapshot == null) { + LOG.warn("The snapshot info is null."); + return RaftServerConstants.INVALID_LOG_INDEX; + } + final File snapshotFile = snapshot.getFile().getPath().toFile(); + if (!snapshotFile.exists()) { + LOG.warn("The snapshot file {} does not exist for snapshot {}", snapshotFile, snapshot); + return RaftServerConstants.INVALID_LOG_INDEX; + } + + final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile); + try(final AutoCloseableLock writeLock = writeLock(); + final ObjectInputStream in = new ObjectInputStream( + new BufferedInputStream(new FileInputStream(snapshotFile)))) { + if (reload) { + reset(); + } + setLastAppliedTermIndex(last); + state.putAll((Map<LogName, Long>) in.readObject()); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + return last.getIndex(); + } + + @Override + public StateMachineStorage getStateMachineStorage() { + return storage; + } + + @Override + public CompletableFuture<Message> query(Message request) { + LogMessage msg = null; + try { + msg = LogMessage.parseFrom(request.getContent()); + LogName logName = msg.getLogName(); + Long len = null; + try(final AutoCloseableLock readLock = readLock()) { + len = state.get(logName); + if (len == null) { + len = new Long(-1); + } + } + LOG.debug("QUERY: {}, RESULT: {}", msg, len); + return CompletableFuture.completedFuture(new LogMessage (logName, len)); + } catch (InvalidProtocolBufferException e) { + //TODO exception handling + throw new RuntimeException(e); + } + + } + + + @Override + public void close() { + reset(); + } + + @Override + public CompletableFuture<Message> applyTransaction(TransactionContext trx) { + try { + final LogEntryProto entry = trx.getLogEntry(); + final LogMessage logMessage = LogMessage.parseFrom((entry.getSmLogEntry().getData())); + + final long index = entry.getIndex(); + Long val = null; + LogName name = null; + try (final AutoCloseableLock writeLock = writeLock()) { + name = logMessage.getLogName(); + long dataLength = logMessage.getData().length; + val = state.get(name); + if (val == null) { + val = new Long(0); + } + state.put(name, val + dataLength); + updateLastAppliedTermIndex(entry.getTerm(), index); + } + final CompletableFuture<Message> f = + CompletableFuture.completedFuture(new LogMessage(name, val)); + final RaftProtos.RaftPeerRole role = trx.getServerRole(); + LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, logMessage, val); + if (LOG.isTraceEnabled()) { + LOG.trace("{}-{}: variables={}", getId(), index, state); + } + return f; + } catch (InvalidProtocolBufferException e) { + // TODO exception handling + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/523fb638/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/Utils.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/Utils.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/Utils.java new file mode 100644 index 0000000..3709b6f --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/Utils.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.util; + +public class Utils { + + + public static int long2bytes(long v, byte[] buf, int offset) { + int2bytes((int)(v >>> 32), buf, offset); + int2bytes((int) v , buf, offset + 4); + return 8; + } + + public static int bytes2int(byte[] buf, int offset) { + return (buf[offset] << 24) + + ((0xFF & buf[offset + 1]) << 16) + + ((0xFF & buf[offset + 2]) << 8) + + (0xFF & buf[offset + 3]); + } + + public static long bytes2long(byte[] buf, int offset) { + return ((long)bytes2int(buf, offset) << 32) + + (0xFFFFFFFFL & bytes2int(buf, offset + 4)); + } + + public static int int2bytes(int v, byte[] buf, int offset) { + buf[offset ] = (byte) (v >>> 24); + buf[offset + 1] = (byte) (v >>> 16); + buf[offset + 2] = (byte) (v >>> 8); + buf[offset + 3] = (byte) (v); + return 4; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/523fb638/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestLogMessage.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestLogMessage.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestLogMessage.java new file mode 100644 index 0000000..fb74d64 --- /dev/null +++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/api/TestLogMessage.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.api; + +import static org.junit.Assert.assertEquals; + +import org.apache.ratis.logservice.api.LogMessage.Type; +import org.junit.Test; + +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException; + +public class TestLogMessage { + + @Test + public void testLogMessages() throws InvalidProtocolBufferException { + + LogMessage msg = new LogMessage(LogName.of("testLog")); + assertEquals(Type.READ_REQUEST, msg.getType()); + ByteString content = msg.getContent(); + LogMessage other = LogMessage.parseFrom(content); + assertEquals(msg.toString(), other.toString()); + + msg = new LogMessage(LogName.of("testLog"), 100); + assertEquals(Type.READ_REPLY, msg.getType()); + content = msg.getContent(); + other = LogMessage.parseFrom(content); + assertEquals(msg.toString(), other.toString()); + + msg = new LogMessage(LogName.of("testLog"), new byte[] { 0, 0, 0 }); + assertEquals(Type.WRITE, msg.getType()); + content = msg.getContent(); + other = LogMessage.parseFrom(content); + assertEquals(msg.toString(), other.toString()); + assertEquals(msg.getData().length, other.getData().length); + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/523fb638/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/TestUtils.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/TestUtils.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/TestUtils.java new file mode 100644 index 0000000..752889a --- /dev/null +++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/TestUtils.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.util; + +import static org.apache.ratis.logservice.util.Utils.bytes2int; +import static org.apache.ratis.logservice.util.Utils.int2bytes; +import static org.apache.ratis.logservice.util.Utils.bytes2long; +import static org.apache.ratis.logservice.util.Utils.long2bytes; +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class TestUtils { + + @Test + public void testInt2Bytes() { + + byte[] buf = new byte[4]; + int v1 = 0; + int2bytes(v1, buf, 0); + int vv1 = bytes2int(buf, 0); + assertEquals(v1, vv1); + int v2 = 1; + int2bytes(v2, buf, 0); + int vv2 = bytes2int(buf, 0); + assertEquals(v2, vv2); + int v3 = -1; + int2bytes(v3, buf, 0); + int vv3 = bytes2int(buf, 0); + assertEquals(v3, vv3); + int v4 = Integer.MIN_VALUE; + int2bytes(v4, buf, 0); + int vv4 = bytes2int(buf, 0); + assertEquals(v4, vv4); + int v5 = Integer.MAX_VALUE; + int2bytes(v5, buf, 0); + int vv5 = bytes2int(buf, 0); + assertEquals(v5, vv5); + + } + + + @Test + public void testLong2Bytes() { + byte[] buf = new byte[8]; + long v1 = 0; + long2bytes(v1, buf, 0); + long vv1 = bytes2long(buf, 0); + assertEquals(v1, vv1); + + long v2 = 1; + long2bytes(v2, buf, 0); + long vv2 = bytes2long(buf, 0); + assertEquals(v2, vv2); + + long v3 = -1; + long2bytes(v3, buf, 0); + long vv3 = bytes2long(buf, 0); + assertEquals(v3, vv3); + + long v4 = Long.MIN_VALUE; + long2bytes(v4, buf, 0); + long vv4 = bytes2long(buf, 0); + assertEquals(v4, vv4); + + long v5 = Integer.MAX_VALUE; + long2bytes(v5, buf, 0); + long vv5 = bytes2long(buf, 0); + assertEquals(v5, vv5); + + long v6 = 100; + buf = new byte[20]; + long2bytes(v6, buf, 12); + long vv6 = bytes2long(buf, 12); + assertEquals(v6, vv6); + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/523fb638/ratis-proto-shaded/src/main/proto/Logservice.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Logservice.proto b/ratis-proto-shaded/src/main/proto/Logservice.proto new file mode 100644 index 0000000..fd586bf --- /dev/null +++ b/ratis-proto-shaded/src/main/proto/Logservice.proto @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; +option java_package = "org.apache.ratis.shaded.proto.logservice"; +option java_outer_classname = "LogServiceProtos"; +option java_generate_equals_and_hash = true; +package ratis.logservice; + +enum MessageType { + READ_REQUEST = 0; + READ_REPLY = 1; + WRITE = 2; +} + +message LogMessage { + MessageType type = 1; + string log_name = 2; + uint64 length = 3; + bytes data = 4; +} \ No newline at end of file
