This is an automated email from the ASF dual-hosted git repository.
elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c2a6a24 RATIS-588 LogStream StateMachine export
c2a6a24 is described below
commit c2a6a24a9ff85cc24132f5d5db40391bd36fbe70
Author: Ankit Singhal <[email protected]>
AuthorDate: Wed Jun 12 10:57:50 2019 -0700
RATIS-588 LogStream StateMachine export
Closes #25
Signed-off-by: Josh Elser <[email protected]>
---
ratis-logservice/pom.xml | 39 +++
.../ratis/logservice/api/ArchiveLogReader.java | 27 ++
.../ratis/logservice/api/ArchiveLogWriter.java | 45 +++
.../org/apache/ratis/logservice/api/LogReader.java | 4 +-
.../org/apache/ratis/logservice/api/LogStream.java | 24 +-
.../ratis/logservice/client/LogServiceClient.java | 120 ++++++-
.../apache/ratis/logservice/common/Constants.java | 1 +
.../logservice/impl/ArchiveHdfsLogReader.java | 215 ++++++++++++
.../logservice/impl/ArchiveHdfsLogWriter.java | 112 +++++++
.../logservice/impl/ArchivedLogStreamImpl.java | 138 ++++++++
.../logservice/impl/ExportedLogStreamImpl.java | 35 ++
.../ratis/logservice/impl/LogStreamImpl.java | 12 +-
.../ratis/logservice/server/ArchivalInfo.java | 95 ++++++
.../apache/ratis/logservice/server/LogServer.java | 6 +-
.../logservice/server/LogServiceRaftLogReader.java | 14 +-
.../ratis/logservice/server/LogStateMachine.java | 369 +++++++++++++++++++--
.../ratis/logservice/server/MetaStateMachine.java | 16 +-
.../ratis/logservice/server/RaftLogReader.java | 51 +++
.../ratis/logservice/shell/CommandFactory.java | 4 +
.../shell/commands/ArchiveLogCommand.java | 47 +++
.../shell/commands/ExportLogCommand.java | 49 +++
.../ratis/logservice/util/LogServiceProtoUtil.java | 61 +++-
.../ratis/logservice/util/LogServiceUtils.java | 20 ++
.../logservice/util/MetaServiceProtoUtil.java | 9 -
ratis-logservice/src/main/proto/LogService.proto | 44 ++-
ratis-logservice/src/main/proto/MetaService.proto | 8 -
.../ratis/logservice/LogServiceReadWriteBase.java | 7 +-
.../impl/TestArchiveHdfsLogReaderAndWriter.java | 153 +++++++++
.../ratis/logservice/server/TestMetaServer.java | 100 +++++-
.../logservice/util/TestLogServiceProtoUtil.java | 7 +-
ratis-logservice/src/test/resources/logservice.xml | 28 ++
.../ratis/statemachine/impl/BaseStateMachine.java | 4 +-
32 files changed, 1747 insertions(+), 117 deletions(-)
diff --git a/ratis-logservice/pom.xml b/ratis-logservice/pom.xml
index f180024..c1f151c 100644
--- a/ratis-logservice/pom.xml
+++ b/ratis-logservice/pom.xml
@@ -181,6 +181,14 @@
<artifactId>ratis-netty</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
+
+ <!-- Hadoop dependencies -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
<!-- CLI dependencies -->
<dependency>
<groupId>com.beust</groupId>
@@ -253,5 +261,36 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
+
+ <!-- Needed for Archive log reader and writer testing-->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/ArchiveLogReader.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/ArchiveLogReader.java
new file mode 100644
index 0000000..1eb1b4f
--- /dev/null
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/ArchiveLogReader.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.api;
+
+import org.apache.ratis.logservice.server.RaftLogReader;
+/**
+ * Archive Log Reader implementation. This class is not thread-safe
+ *
+ */
+public interface ArchiveLogReader extends RaftLogReader, LogReader {
+
+}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/ArchiveLogWriter.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/ArchiveLogWriter.java
new file mode 100644
index 0000000..98038a1
--- /dev/null
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/ArchiveLogWriter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.api;
+
+import java.io.IOException;
+
+public interface ArchiveLogWriter extends LogWriter{
+
+ /**
+ * Initializes the writer
+ * @param location archival location
+ * @param logName
+ * @throws IOException
+ */
+ void init(String location, LogName logName) throws IOException;
+
+ /**
+ * Rolls writer after number of records written crosses threshold
+ * {@link
org.apache.ratis.logservice.server.LogStateMachine#DEFAULT_ARCHIVE_THRESHOLD_PER_FILE}
+ *
+ * @throws IOException
+ */
+ void rollWriter() throws IOException;
+
+ /**
+ * Record Id of the last written record
+ * @return
+ */
+ long getLastWrittenRecordId() throws IOException;
+}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java
index b3bb618..2119fab 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogReader.java
@@ -29,7 +29,7 @@ public interface LogReader extends AutoCloseable {
/**
* Seeks to the position before the record at the provided {@code recordId}
in the LogStream.
*
- * @param offset A non-negative, offset in the LogStream
+ * @param recordId A non-negative, offset in the LogStream
* @return A future for when the operation is completed.
*/
void seek(long recordId) throws IOException;
@@ -79,7 +79,7 @@ public interface LogReader extends AutoCloseable {
/**
* Returns the current position of this Reader. The position is a {@code
recordId}.
*/
- long getPosition();
+ long getPosition() throws IOException;
/**
* Overrides {@link #close()} in {@link AutoCloseable} to throw an
IOException.
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
index f1123af..38d4159 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
@@ -31,8 +31,26 @@ public interface LogStream extends AutoCloseable{
* An enumeration that defines the current state of a LogStream
*/
public enum State {
+ /*
+ Log is open to receive writes/read request
+ */
OPEN,
- CLOSED;
+ /*
+ Log is closed and can not be written but available for read or archival or
export
+ */
+ CLOSED,
+ /*
+ Log is currently archiving but still readable and can be exported to a
different locations
+ */
+ ARCHIVING,
+ /*
+ Log is archiving and available for read only
+ */
+ ARCHIVED,
+ /*
+ Log is deleted so not available for read or write or any other operation
+ */
+ DELETED;
}
/**
@@ -43,7 +61,7 @@ public interface LogStream extends AutoCloseable{
/**
* Returns the current state of this log.
*/
- State getState();
+ State getState() throws IOException;
/**
* Returns the size of this LogStream in bytes.
@@ -62,7 +80,7 @@ public interface LogStream extends AutoCloseable{
*
* @return A synchronous reader
*/
- LogReader createReader();
+ LogReader createReader() throws IOException;
/**
* Creates a write to write to this LogStream.
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
index 7896249..05b9292 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java
@@ -26,8 +26,13 @@ import
org.apache.ratis.logservice.api.LogServiceConfiguration;
import org.apache.ratis.logservice.api.LogStream;
import org.apache.ratis.logservice.api.LogStream.State;
import org.apache.ratis.logservice.common.Constants;
+import org.apache.ratis.logservice.impl.ArchivedLogStreamImpl;
+import org.apache.ratis.logservice.impl.ExportedLogStreamImpl;
import org.apache.ratis.logservice.impl.LogStreamImpl;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
import org.apache.ratis.logservice.proto.MetaServiceProtos.*;
+import org.apache.ratis.logservice.server.ArchivalInfo;
+import org.apache.ratis.logservice.util.LogServiceProtoUtil;
import org.apache.ratis.logservice.util.MetaServiceProtoUtil;
import org.apache.ratis.protocol.*;
@@ -85,8 +90,9 @@ public class LogServiceClient implements AutoCloseable {
*/
public LogStream createLog(LogName logName) throws IOException {
RaftClientReply reply = client.sendReadOnly(
- () ->
MetaServiceProtoUtil.toCreateLogRequestProto(logName).toByteString());
- CreateLogReplyProto message =
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
+ () ->
MetaServiceProtoUtil.toCreateLogRequestProto(logName).toByteString());
+ CreateLogReplyProto message =
+ CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
if (message.hasException()) {
throw
MetaServiceProtoUtil.toMetaServiceException(message.getException());
}
@@ -101,16 +107,46 @@ public class LogServiceClient implements AutoCloseable {
* @throws IOException
*/
public LogStream getLog(LogName logName) throws IOException {
- RaftClientReply reply = client.sendReadOnly
- (() ->
MetaServiceProtoUtil.toGetLogRequestProto(logName).toByteString());
- GetLogReplyProto message =
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
- if(message.hasException()) {
- throw
MetaServiceProtoUtil.toMetaServiceException(message.getException());
- }
- LogInfo info = MetaServiceProtoUtil.toLogInfo(message.getLog());
- return new LogStreamImpl(logName, getRaftClient(info), config);
+ return new LogStreamImpl(logName, getRaftClient(getLogInfo(logName)),
config);
+ }
+
+ /**
+ * Get Archive log .
+ * @param logName the name of the log to get
+ * @return
+ * @throws IOException
+ */
+ public LogStream getArchivedLog(LogName logName) throws IOException {
+ return new ArchivedLogStreamImpl(logName, config);
}
+ /**
+ * Get exported log .
+ * @param logName the name of the log to get
+ * @param location location of the exported log
+ * @return
+ * @throws IOException
+ */
+
+ public LogStream getExportLog(LogName logName, String location) throws
IOException {
+ return new ExportedLogStreamImpl(logName, location);
+ }
+
+ public List<ArchivalInfo> getExportStatus(LogName logName) throws
IOException {
+ try (RaftClient client = getRaftClient(getLogInfo(logName))) {
+ RaftClientReply exportInfoReply = client.sendReadOnly(
+ () ->
LogServiceProtoUtil.toExportInfoRequestProto(logName).toByteString());
+ LogServiceProtos.GetExportInfoReplyProto message =
+ LogServiceProtos.GetExportInfoReplyProto
+ .parseFrom(exportInfoReply.getMessage().getContent());
+ if (message.hasException()) {
+ throw new IOException(message.getException().getErrorMsg());
+ }
+ return message.getInfoList().stream()
+ .map(infoProto -> LogServiceProtoUtil.toExportInfo(infoProto))
+ .collect(Collectors.toList());
+ }
+ }
public void deleteLog(LogName logName) throws IOException {
RaftClientReply reply = client.sendReadOnly
@@ -149,20 +185,62 @@ public class LogServiceClient implements AutoCloseable {
* @param logInfo
* @return
*/
- private RaftClient getRaftClient(LogInfo logInfo) {
+ private RaftClient getRaftClient(LogInfo logInfo) throws IOException {
+
RaftProperties properties = new RaftProperties();
return
RaftClient.newBuilder().setRaftGroup(logInfo.getRaftGroup()).setProperties(properties).build();
}
+ private LogInfo getLogInfo(LogName logName) throws IOException {
+ RaftClientReply reply = client.sendReadOnly(
+ () ->
MetaServiceProtoUtil.toGetLogRequestProto(logName).toByteString());
+ GetLogReplyProto message =
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
+ if (message.hasException()) {
+ throw
MetaServiceProtoUtil.toMetaServiceException(message.getException());
+ }
+ return MetaServiceProtoUtil.toLogInfo(message.getLog());
+ }
+
/**
- * Archives the given log out of the state machine and into a configurable
long-term storage. A log must be
- * in {@link State#CLOSED} to archive it.
+ * Archives the given log out of the state machine and into a configurable
long-term storage.
+ * A log must be in {@link State#CLOSED} to archive it.
+ * Archiving of the log will happen asynchronously from the client,
+ * The call will return immediately after adding a request for archiving
log
+ * to the respective quorum
*
- * @param name The name of the log to archive.
+ * Client can check the status of Archiving by calling getState() Method
+ *
+ * @param logName The name of the log to archive.
+ */
+ public void archiveLog(LogName logName) throws IOException {
+ exportLog(logName, null, 0);
+ }
+
+ /**
+ * Export the given log out of the state machine and into a provided
location on the configured storage
+ * A log must be in {@link State#CLOSED} to export it.
+ * exporting of the log will happen asynchronously from the client,
+ * The call will return immediately after adding a request for archiving
log
+ * to the respective quorum
+ *
+ * Client can check the status of export by calling getState() Method
+ *
+ * @param logName The name of the log to archive.
*/
- void archiveLog(LogName name) throws IOException {
- // TODO: write me
+ public void exportLog(LogName logName, String location, long recordId)
throws IOException {
+ try (RaftClient client = getRaftClient(getLogInfo(logName))) {
+ RaftClientReply archiveLogReply = client.sendReadOnly(() ->
LogServiceProtoUtil
+ .toArchiveLogRequestProto(logName, location, recordId,
+ location == null ? true : false,
ArchivalInfo.ArchivalStatus.SUBMITTED)
+ .toByteString());
+ LogServiceProtos.ArchiveLogReplyProto archiveMessage =
+ LogServiceProtos.ArchiveLogReplyProto
+ .parseFrom(archiveLogReply.getMessage().getContent());
+ if (archiveMessage.hasException()) {
+ throw new
IOException(archiveMessage.getException().getErrorMsg());
+ }
+ }
}
/**
@@ -172,8 +250,14 @@ public class LogServiceClient implements AutoCloseable {
* @param name The name of the log to close
*/
// TODO this name sucks, confusion WRT the Java Closeable interface.
- void closeLog(LogName name) throws IOException {
- //TODO: write me
+ public void closeLog(LogName name) throws IOException {
+ try (RaftClient client = getRaftClient(getLogInfo(name))) {
+ RaftClientReply reply = client.send(
+ () -> LogServiceProtoUtil.toChangeStateRequestProto(name,
State.CLOSED)
+ .toByteString());
+ LogServiceProtos.ChangeStateReplyProto message =
+
LogServiceProtos.ChangeStateReplyProto.parseFrom(reply.getMessage().getContent());
+ }
}
/**
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
index 7366514..be46177 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java
@@ -40,6 +40,7 @@ public class Constants {
public static final String LOG_SERVICE_META_SERVER_GROUPID_KEY =
"logservice.metaserver.groupid";
public static final String LOG_SERVICE_LOG_SERVER_GROUPID_KEY =
"logservice.logserver.groupid";
+ public static final String LOG_SERVICE_ARCHIVAL_LOCATION_KEY =
"logservice.archival.location";
/*
* Raft properties
*/
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchiveHdfsLogReader.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchiveHdfsLogReader.java
new file mode 100644
index 0000000..1d5a403
--- /dev/null
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchiveHdfsLogReader.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ratis.logservice.api.ArchiveLogReader;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.util.LogServiceUtils;
+import org.apache.ratis.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArchiveHdfsLogReader implements ArchiveLogReader {
+ public static final Logger LOG =
LoggerFactory.getLogger(ArchiveHdfsLogReader.class);
+ private long fileLength;
+ private List<FileStatus> files;
+ private FileSystem hdfs;
+ private FSDataInputStream is;
+ private byte[] currentRecord;
+ private int fileCounter = 0;
+ private int currentRecordId;
+
+ public ArchiveHdfsLogReader(String archiveLocation) throws IOException {
+ this(new Configuration(), archiveLocation);
+ }
+
+ public ArchiveHdfsLogReader(Configuration configuration, String
archiveLocation)
+ throws IOException {
+ this.hdfs = FileSystem.get(configuration);
+ Path archiveLocationPath = new Path(archiveLocation);
+ if (!hdfs.exists(archiveLocationPath)) {
+ throw new FileNotFoundException(archiveLocation);
+ }
+ files = Arrays.asList(hdfs.listStatus(archiveLocationPath));
+ if (files.size() > 0) {
+ Collections.sort(files, new Comparator<FileStatus>() {
+ @Override public int compare(FileStatus o1, FileStatus o2) {
+ //ascending order
+ //currently written file (without _recordId_) will be sorted at the
last
+ return LogServiceUtils.getRecordIdFromRolledArchiveFile(o1.getPath())
+
.compareTo(LogServiceUtils.getRecordIdFromRolledArchiveFile(o2.getPath()));
+ }
+ });
+ openNextFilePath();
+ loadNext();
+ }
+ }
+
+ private Path openNextFilePath() throws IOException {
+ Path filePath = files.get(fileCounter).getPath();
+ this.is = this.hdfs.open(filePath);
+ this.fileLength = this.hdfs.getFileStatus(filePath).getLen();
+ fileCounter++;
+ return filePath;
+
+ }
+
+ @Override public void seek(long recordId) throws IOException {
+ while (currentRecordId < recordId && hasNext()) {
+ next();
+ }
+ }
+
+ @Override public boolean hasNext() throws IOException {
+ return currentRecord != null;
+ }
+
+ @Override public byte[] next() throws IOException {
+ byte[] current = currentRecord;
+ currentRecord = null;
+ if (current != null) {
+ currentRecordId++;
+ }
+ loadNext();
+ return current;
+ }
+
+ @Override public long getCurrentRaftIndex() {
+ throw new UnsupportedOperationException(
+ "getCurrentRaftIndex() is not supported for archive hdfs log reader");
+ }
+
+ @Override public ByteBuffer readNext() throws IOException {
+ byte[] current = next();
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ return ByteBuffer.wrap(current);
+ }
+
+ private int readLength() throws IOException {
+ int length;
+ try {
+ length = is.readInt();
+ } catch (EOFException e) {
+ if (files.size() <= fileCounter) {
+ LOG.trace("EOF and no more file to read, throwing back", e);
+ throw e;
+ } else {
+ LOG.trace("EOF.. Opening next file: {}!!",
files.get(fileCounter).getPath());
+ openNextFilePath();
+ length = is.readInt();
+ }
+ }
+ return length;
+ }
+
+ @Override public void readNext(ByteBuffer buffer) throws IOException {
+ Preconditions.checkNotNull(buffer, "buffer is NULL");
+ byte[] current = next();
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ buffer.put(current);
+ }
+
+
+ @Override public List<ByteBuffer> readBulk(int numRecords) throws
IOException {
+ Preconditions.checkArgument(numRecords > 0, "number of records must be
greater than 0");
+ List<ByteBuffer> ret = new ArrayList<ByteBuffer>();
+ try {
+
+ for (int i = 0; i < numRecords; i++) {
+ ByteBuffer buffer = readNext();
+ ret.add(buffer);
+ }
+
+ } catch (EOFException eof) {
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ return ret;
+ }
+ }
+
+ @Override public int readBulk(ByteBuffer[] buffers) throws IOException {
+ Preconditions.checkNotNull(buffers, "list of buffers is NULL");
+ Preconditions.checkArgument(buffers.length > 0, "list of buffers is
empty");
+ int count = 0;
+ try {
+ for (int i = 0; i < buffers.length; i++) {
+ readNext(buffers[i]);
+ count++;
+ }
+ } catch (EOFException eof) {
+
+ }
+ return count;
+ }
+
+ @Override public long getPosition() throws IOException {
+ return currentRecordId;
+ }
+
+ @Override public void close() throws IOException {
+ if (this.is != null) {
+ this.is.close();
+ this.is = null;
+ }
+ }
+
+ private void loadNext() throws IOException {
+ int length;
+ try {
+ length = readLength();
+ } catch (EOFException e) {
+ currentRecord = null;
+ return;
+ }
+ byte[] bytes = new byte[length];
+ if (is.read(bytes) != length) {
+ throw new EOFException(
+ "File seems to be corrupted, Encountered EOF before reading the
complete record");
+ }
+ currentRecord = bytes;
+ }
+
+ //Only for testing
+ public List<FileStatus> getFiles(){
+ return files;
+ }
+
+}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchiveHdfsLogWriter.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchiveHdfsLogWriter.java
new file mode 100644
index 0000000..e91cd86
--- /dev/null
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchiveHdfsLogWriter.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ratis.logservice.api.ArchiveLogWriter;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.util.LogServiceUtils;
+import org.apache.ratis.util.LogUtils;
+
+public class ArchiveHdfsLogWriter implements ArchiveLogWriter {
+ private final Configuration configuration;
+ private FileSystem hdfs;
+ private FSDataOutputStream os;
+ private Path currentPath;
+ private long currentRecordId;
+ private long lastRollRecordId;
+
+ public ArchiveHdfsLogWriter(Configuration conf) {
+ this.configuration = conf;
+ }
+
+ public ArchiveHdfsLogWriter() {
+ this.configuration = new Configuration();
+ }
+
+ @Override public void init(String archiveLocation, LogName logName) throws
IOException {
+ hdfs = FileSystem.get(configuration);
+ Path loc = new
Path(LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName));
+ if (!hdfs.exists(loc)) {
+ hdfs.mkdirs(loc);
+ }
+ currentPath = new Path(loc, logName.getName());
+ os = hdfs.create(currentPath, true);
+ }
+
+ @Override public long write(ByteBuffer buffer) throws IOException {
+ if (buffer.hasArray()) {
+ int startIndex = buffer.arrayOffset();
+ int curIndex = buffer.arrayOffset() + buffer.position();
+ int endIndex = curIndex + buffer.remaining();
+ int length = endIndex - startIndex;
+ os.writeInt(length);
+ os.write(buffer.array(), startIndex, length);
+ } else {
+ throw new IllegalArgumentException(
+ "Currently array backed byte buffer is only supported for archive
write !!");
+ }
+ currentRecordId++;
+ return currentRecordId;
+ }
+
+ @Override public List<Long> write(List<ByteBuffer> records) throws
IOException {
+ List<Long> list = new ArrayList<Long>();
+ for (ByteBuffer record : records) {
+ list.add(write(record));
+ }
+ return list;
+ }
+
+ @Override public long sync() throws IOException {
+ return 0;
+ }
+
+ @Override public void close() throws IOException {
+ os.close();
+ if (lastRollRecordId != currentRecordId) {
+ hdfs.rename(currentPath, new Path(currentPath + "_recordId_" +
currentRecordId));
+ }
+ }
+
+ @Override public void rollWriter() throws IOException {
+ if (lastRollRecordId != currentRecordId) {
+ //close old file
+ os.close();
+ hdfs.rename(currentPath,
+ new Path(LogServiceUtils.getRolledPathForArchiveWriter(currentPath,
currentRecordId)));
+ lastRollRecordId = currentRecordId;
+ //create new file
+ os = hdfs.create(currentPath, true);
+ }
+ }
+
+ @Override public long getLastWrittenRecordId() throws IOException {
+ os.hflush();
+ return currentRecordId;
+ }
+
+}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchivedLogStreamImpl.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchivedLogStreamImpl.java
new file mode 100644
index 0000000..682d6c1
--- /dev/null
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ArchivedLogStreamImpl.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.logservice.api.ArchiveLogReader;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.api.LogServiceConfiguration;
+import org.apache.ratis.logservice.api.LogStream;
+import org.apache.ratis.logservice.api.LogWriter;
+import org.apache.ratis.logservice.api.RecordListener;
+import org.apache.ratis.logservice.common.Constants;
+import org.apache.ratis.logservice.util.LogServiceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ArchivedLogStreamImpl implements LogStream {
+ public static final Logger LOG =
LoggerFactory.getLogger(ArchivedLogStreamImpl.class);
+ /*
+ * Directory of the archived files
+ */
+ private String location;
+
+ /*
+ * Log stream name
+ */
+ LogName name;
+ /*
+ * Log stream configuration
+ */
+ LogServiceConfiguration config;
+ /*
+ * State
+ */
+ State state;
+
+
+ public ArchivedLogStreamImpl(LogName name, LogServiceConfiguration config) {
+ this(name, config.get(Constants.LOG_SERVICE_ARCHIVAL_LOCATION_KEY));
+ if(config!=null) {
+ this.config = config;
+ }
+ init();
+ }
+
+ protected ArchivedLogStreamImpl(LogName name, String location) {
+ this.name = name;
+ this.location = location;
+ }
+
+ protected void init() {
+ state = State.ARCHIVED;
+ }
+
+ @Override
+ public LogName getName() {
+ return name;
+ }
+
+ @Override public State getState() throws IOException {
+ return state;
+ }
+
+ @Override
+ public long getSize() throws IOException{
+ throw new UnsupportedOperationException("getSize()");
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ throw new UnsupportedOperationException("getLength()");
+ }
+
+ @Override
+ public ArchiveLogReader createReader() throws IOException {
+ return new
ArchiveHdfsLogReader(LogServiceUtils.getArchiveLocationForLog(location, name));
+ }
+
+ @Override
+ public LogWriter createWriter() {
+ throw new UnsupportedOperationException("Archived log cannot be written");
+ }
+
+ @Override
+ public long getLastRecordId() throws IOException {
+ throw new UnsupportedOperationException("getLastRecordId()");
+ }
+
+ @Override
+ public long getStartRecordId() throws IOException {
+ throw new UnsupportedOperationException("getStartRecordId()");
+ }
+
+ @Override public Collection<RecordListener> getRecordListeners() {
+ throw new UnsupportedOperationException("get record listeners");
+ }
+
+ @Override
+ public LogServiceConfiguration getConfiguration() {
+ return config;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+
+ @Override public void addRecordListener(RecordListener listener) {
+ throw new UnsupportedOperationException("Add record listener");
+ }
+
+ @Override public boolean removeRecordListener(RecordListener listener) {
+ throw new UnsupportedOperationException("remove record listener");
+ }
+
+ @Override
+ public RaftClient getRaftClient() {
+ throw new UnsupportedOperationException("getRaftClient()");
+ }
+
+}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ExportedLogStreamImpl.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ExportedLogStreamImpl.java
new file mode 100644
index 0000000..ae9a7c2
--- /dev/null
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/ExportedLogStreamImpl.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.impl;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExportedLogStreamImpl extends ArchivedLogStreamImpl {
+ public static final Logger LOG =
LoggerFactory.getLogger(ExportedLogStreamImpl.class);
+
+ public ExportedLogStreamImpl(LogName name, String location) {
+ super(name, location);
+ }
+
+ @Override
+ protected void init() {
+ state = State.CLOSED;
+ }
+}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
index 3394ea6..655fc95 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
@@ -30,6 +30,7 @@ import
org.apache.ratis.logservice.api.LogServiceConfiguration;
import org.apache.ratis.logservice.api.LogStream;
import org.apache.ratis.logservice.api.LogWriter;
import org.apache.ratis.logservice.api.RecordListener;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
import
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLastCommittedIndexReplyProto;
import
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLengthReplyProto;
import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogSizeReplyProto;
@@ -96,9 +97,12 @@ public class LogStreamImpl implements LogStream {
return name;
}
- @Override
- public State getState() {
- return state;
+ @Override public State getState() throws IOException {
+ RaftClientReply reply = raftClient.sendReadOnly(
+
Message.valueOf(LogServiceProtoUtil.toGetStateRequestProto(name).toByteString()));
+ LogServiceProtos.GetStateReplyProto proto =
+
LogServiceProtos.GetStateReplyProto.parseFrom(reply.getMessage().getContent());
+ return State.valueOf(proto.getState().name());
}
@Override
@@ -130,7 +134,7 @@ public class LogStreamImpl implements LogStream {
}
@Override
- public LogReader createReader() {
+ public LogReader createReader() throws IOException{
return new LogReaderImpl(this);
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ArchivalInfo.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ArchivalInfo.java
new file mode 100644
index 0000000..0c2b460
--- /dev/null
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ArchivalInfo.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.server;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
+import org.apache.ratis.logservice.util.LogServiceProtoUtil;
+
+public class ArchivalInfo {
+
+ public enum ArchivalStatus{
+ /*
+ Initial state when the archival/export request is submitted and request
+ is recorded at the leader but the single thread responsible for archiving
+ has not started processing it
+ */
+ SUBMITTED,
+ /*
+ Archiving/exporting of the particular request has been started and
+ file will appear soon in archival location during this state
+ */
+ STARTED,
+ /*
+ Archival is ongoing and at least one file is rolled as well
+ */
+ RUNNING,
+ /*
+ Archiving on the current leader will get interrupted
+ when it become a follower after re-election.
+ and a request to new leader will be submitted after some delay
+ to avoid leader election storm, if a new request fails , archival
+ status will be changed to FAILED and log state back to CLOSED so that
+ user can submit request again
+ */
+ INTERRUPTED,
+ /*
+ Archival/export request is successfully completed
+ */
+ COMPLETED,
+ /*
+ Archival/export request is failed due to the error,
+ worker logs should have trace for it
+ After fixing the issue , archival request can be resubmitted
+ */
+ FAILED
+ }
+ private String archiveLocation;
+ private LogName archiveLogName;
+ private long lastArchivedIndex;
+ private ArchivalStatus status;
+
+ public ArchivalInfo(String location) {
+ this.archiveLocation = location;
+ }
+
+
+ public ArchivalInfo
updateArchivalInfo(LogServiceProtos.ArchiveLogRequestProto archiveLog) {
+ this.archiveLogName =
LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+ this.lastArchivedIndex = archiveLog.getLastArchivedRaftIndex();
+ this.status = ArchivalStatus.valueOf(archiveLog.getStatus().name());
+ return this;
+ }
+
+ public String getArchiveLocation() {
+ return archiveLocation;
+ }
+
+ public LogName getArchiveLogName() {
+ return archiveLogName;
+ }
+
+ public long getLastArchivedIndex() {
+ return lastArchivedIndex;
+ }
+
+ public ArchivalStatus getStatus(){
+ return status;
+ }
+
+}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
index 1596e72..50da769 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
@@ -80,6 +80,10 @@ public class LogServer extends BaseServer {
long segmentSize =
getConfig().getLong(Constants.RATIS_RAFT_SEGMENT_SIZE_KEY,
Constants.DEFAULT_RATIS_RAFT_SEGMENT_SIZE);
SizeInBytes segmentSizeBytes = SizeInBytes.valueOf(segmentSize);
+ String archiveLocation =
getConfig().get(Constants.LOG_SERVICE_ARCHIVAL_LOCATION_KEY);
+ if (archiveLocation != null) {
+ properties.set(Constants.LOG_SERVICE_ARCHIVAL_LOCATION_KEY,
archiveLocation);
+ }
RaftServerConfigKeys.Log.setSegmentSizeMax(properties, segmentSizeBytes);
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
segmentSizeBytes);
@@ -117,7 +121,7 @@ public class LogServer extends BaseServer {
if(raftGroupId.equals(logServerGroupId)) {
return new ManagementStateMachine();
}
- return new LogStateMachine();
+ return new LogStateMachine(properties);
}
})
.setProperties(properties)
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
index 8c6e00d..b3b85ec 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServiceRaftLogReader.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
* A reader for the {@link RaftLog} which is accessed using LogService
recordId's instead
* of Raft log indexes. Not thread-safe.
*/
-public class LogServiceRaftLogReader {
+public class LogServiceRaftLogReader implements RaftLogReader{
private static final Logger LOG =
LoggerFactory.getLogger(LogServiceRaftLogReader.class);
private final RaftLog raftLog;
@@ -55,6 +55,7 @@ public class LogServiceRaftLogReader {
* Positions this reader just before the current recordId. Use {@link
#next()} to get that
* element, but take care to check if a value is present using {@link
#hasNext()} first.
*/
+ @Override
public void seek(long recordId) throws RaftLogIOException,
InvalidProtocolBufferException {
LOG.trace("Seeking to recordId={}", recordId);
// RaftLog starting index
@@ -75,6 +76,7 @@ public class LogServiceRaftLogReader {
/**
* Returns true if there is a log entry to read.
*/
+ @Override
public boolean hasNext() throws RaftLogIOException,
InvalidProtocolBufferException {
return currentRecord != null;
}
@@ -83,14 +85,15 @@ public class LogServiceRaftLogReader {
* Returns the next log entry. Ensure {@link #hasNext()} returns true before
* calling this method.
*/
- public ByteString next() throws RaftLogIOException,
InvalidProtocolBufferException {
+ @Override
+ public byte[] next() throws RaftLogIOException,
InvalidProtocolBufferException {
if (currentRecord == null) {
throw new NoSuchElementException();
}
ByteString current = currentRecord;
currentRecord = null;
loadNext();
- return current;
+ return current.toByteArray();
}
/**
@@ -159,4 +162,9 @@ public class LogServiceRaftLogReader {
}
// If we make it here, we've read off the end of the RaftLog.
}
+
+ @Override
+ public long getCurrentRaftIndex(){
+ return currentRaftIndex;
+ }
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index 9185641..6b631a0 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -17,6 +17,8 @@
*/
package org.apache.ratis.logservice.server;
+import static org.apache.ratis.logservice.api.LogStream.State;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -25,29 +27,49 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
-import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.logservice.api.ArchiveLogWriter;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.common.Constants;
+import org.apache.ratis.logservice.impl.ArchiveHdfsLogReader;
+import org.apache.ratis.logservice.impl.ArchiveHdfsLogWriter;
import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
+import org.apache.ratis.logservice.proto.LogServiceProtos;
import
org.apache.ratis.logservice.proto.LogServiceProtos.AppendLogEntryRequestProto;
-import org.apache.ratis.logservice.proto.LogServiceProtos.CloseLogReplyProto;
-import org.apache.ratis.logservice.proto.LogServiceProtos.CloseLogRequestProto;
import
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLengthRequestProto;
import
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogSizeRequestProto;
import org.apache.ratis.logservice.proto.LogServiceProtos.GetStateRequestProto;
import
org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto;
import org.apache.ratis.logservice.proto.LogServiceProtos.ReadLogRequestProto;
import org.apache.ratis.logservice.util.LogServiceProtoUtil;
+import org.apache.ratis.logservice.util.LogServiceUtils;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.logservice.server.ArchivalInfo.ArchivalStatus;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.impl.ServerState;
@@ -66,6 +88,8 @@ import org.slf4j.LoggerFactory;
public class LogStateMachine extends BaseStateMachine {
public static final Logger LOG =
LoggerFactory.getLogger(LogStateMachine.class);
+ public static final long DEFAULT_ARCHIVE_THRESHOLD_PER_FILE = 1000000;
+ private final RaftProperties properties;
private RatisMetricRegistry metricRegistry;
private Timer sizeRequestTimer;
private Timer readNextQueryTimer;
@@ -75,12 +99,10 @@ public class LogStateMachine extends BaseStateMachine {
private Timer startIndexTimer;
private Timer appendRequestTimer;
private Timer syncRequesTimer;
+ private Timer archiveLogRequestTimer;
private Timer getCloseLogTimer;
-
- public static enum State {
- OPEN, CLOSED
- }
-
+ private RaftClient client;
+ //Archival information
/*
* State is a log's length, size, and state (closed/open);
*/
@@ -99,9 +121,18 @@ public class LogStateMachine extends BaseStateMachine {
private RaftLog log;
- private RaftGroupId groupId;
private RaftServerProxy proxy ;
+ private ExecutorService executorService;
+ private boolean isArchivalRequest;
+ private ArchivalInfo archivalInfo;
+ private Map<String,ArchivalInfo> exportMap = new
HashMap<String,ArchivalInfo>();
+ private Map<String, Future<Boolean>> archiveExportFutures = new HashMap<>();
+ private Timer archiveLogTimer;
+
+ public LogStateMachine(RaftProperties properties) {
+ this.properties = properties;
+ }
private AutoCloseableLock readLock() {
return AutoCloseableLock.acquire(lock.readLock());
@@ -126,7 +157,6 @@ public class LogStateMachine extends BaseStateMachine {
super.initialize(server, groupId, raftStorage);
this.storage.init(raftStorage);
this.proxy = (RaftServerProxy) server;
- this.groupId = groupId;
//TODO: using groupId for metric now but better to tag it with LogName
this.metricRegistry = LogServiceMetricsRegistry
.createMetricRegistryForLogService(groupId.toString(),
server.getId().toString());
@@ -139,7 +169,15 @@ public class LogStateMachine extends BaseStateMachine {
this.syncRequesTimer = metricRegistry.timer("syncRequesTime");
this.appendRequestTimer = metricRegistry.timer("appendRequestTime");
this.getCloseLogTimer = metricRegistry.timer("getCloseLogTime");
+ //archiving request time not the actual archiving time
+ this.archiveLogRequestTimer =
metricRegistry.timer("archiveLogRequestTime");
+ this.archiveLogTimer = metricRegistry.timer("archiveLogTime");
loadSnapshot(storage.getLatestSnapshot());
+ executorService = Executors.newSingleThreadExecutor();
+ this.archivalInfo =
+ new
ArchivalInfo(properties.get(Constants.LOG_SERVICE_ARCHIVAL_LOCATION_KEY));
+
+
}
private void checkInitialization() throws IOException {
@@ -266,7 +304,14 @@ public class LogStateMachine extends BaseStateMachine {
return processGetLengthRequest(logServiceRequestProto);
}
});
- default:
+ case ARCHIVELOG:
+ return recordTime(archiveLogRequestTimer, new Task(){
+ @Override public CompletableFuture<Message> run() {
+ return processArchiveLog(logServiceRequestProto);
+ }});
+ case EXPORTINFO:
+ return processExportInfo(logServiceRequestProto);
+ default:
// TODO
throw new RuntimeException(
"Wrong message type for query: " +
logServiceRequestProto.getRequestCase());
@@ -279,6 +324,19 @@ public class LogStateMachine extends BaseStateMachine {
}
+ private CompletableFuture<Message> processExportInfo(
+ LogServiceRequestProto logServiceRequestProto) {
+ LogServiceProtos.GetExportInfoRequestProto exportInfoRequestProto =
+ logServiceRequestProto.getExportInfo();
+ LogServiceProtos.GetExportInfoReplyProto.Builder exportBuilder =
+ LogServiceProtos.GetExportInfoReplyProto.newBuilder();
+ exportMap.values().stream().map(
+ archivalInfo ->
exportBuilder.addInfo(LogServiceProtoUtil.toExportInfoProto(archivalInfo)))
+ .collect(Collectors.toList());
+
+ return
CompletableFuture.completedFuture(Message.valueOf(exportBuilder.build().toByteString()));
+ }
+
/**
* Process get start index request
* @param proto message
@@ -302,7 +360,6 @@ public class LogStateMachine extends BaseStateMachine {
private CompletableFuture<Message>
processGetLastCommittedIndexRequest(LogServiceRequestProto proto)
{
-
Throwable t = verifyState(State.OPEN);
long lastIndex = log.getLastCommittedIndex();
return CompletableFuture.completedFuture(Message
@@ -340,19 +397,34 @@ public class LogStateMachine extends BaseStateMachine {
long startRecordId = msgProto.getStartRecordId();
// And the number of records they want to read
int numRecordsToRead = msgProto.getNumRecords();
- Throwable t = verifyState(State.OPEN);
+ //Log must have been closed while Archiving , so we can let user only to
+ // read when the log is either OPEN or ARCHIVED
+ Throwable t = verifyState(State.OPEN, State.ARCHIVING, State.CLOSED,
State.ARCHIVED);
List<byte[]> list = null;
if (t == null) {
- LogServiceRaftLogReader reader = new LogServiceRaftLogReader(log);
- list = new ArrayList<byte[]>();
+ RaftLogReader reader = null;
try {
- reader.seek(startRecordId);
- for (int i = 0; i < numRecordsToRead; i++) {
- if (!reader.hasNext()) {
- break;
+ if (this.state == State.OPEN || this.state == State.CLOSED
+ || this.state == State.ARCHIVING) {
+ reader = new LogServiceRaftLogReader(log);
+ } else if (this.state == State.ARCHIVED) {
+ reader = new ArchiveHdfsLogReader(LogServiceUtils
+ .getArchiveLocationForLog(archivalInfo.getArchiveLocation(),
+ archivalInfo.getArchiveLogName()));
+ } else {
+ //could be a race condition
+ t = verifyState(State.OPEN, State.ARCHIVED);
+ }
+ if (t == null && reader != null) {
+ list = new ArrayList<byte[]>();
+ reader.seek(startRecordId);
+ for (int i = 0; i < numRecordsToRead; i++) {
+ if (!reader.hasNext()) {
+ break;
+ }
+ list.add(reader.next());
}
- list.add(reader.next().toByteArray());
}
} catch (Exception e) {
LOG.error("Failed to execute ReadNextQuery", e);
@@ -425,10 +497,10 @@ public class LogStateMachine extends BaseStateMachine {
LogServiceRequestProto logServiceRequestProto =
LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
switch (logServiceRequestProto.getRequestCase()) {
- case CLOSELOG:
+ case CHANGESTATE:
return recordTime(getCloseLogTimer, new Task(){
@Override public CompletableFuture<Message> run() {
- return processCloseLog(logServiceRequestProto);
+ return processChangeState(logServiceRequestProto);
}});
case APPENDREQUEST:
return recordTime(appendRequestTimer, new Task(){
@@ -440,6 +512,8 @@ public class LogStateMachine extends BaseStateMachine {
@Override public CompletableFuture<Message> run() {
return processSyncRequest(trx, logServiceRequestProto);
}});
+ case ARCHIVELOG:
+ return updateArchiveLogInfo(logServiceRequestProto);
default:
//TODO
return null;
@@ -452,29 +526,256 @@ public class LogStateMachine extends BaseStateMachine {
- private CompletableFuture<Message> processCloseLog(LogServiceRequestProto
logServiceRequestProto) {
- CloseLogRequestProto closeLog = logServiceRequestProto.getCloseLog();
+ private CompletableFuture<Message> processChangeState(LogServiceRequestProto
logServiceRequestProto) {
+ LogServiceProtos.ChangeStateLogRequestProto changeState =
logServiceRequestProto.getChangeState();
// Need to check whether the file is opened if opened close it.
// TODO need to handle exceptions while operating with files.
+
+ State targetState = State.valueOf(changeState.getState().name());
+ //if forced skip checking states
+ if(!changeState.getForce()) {
+ switch (targetState) {
+ case OPEN:
+ if (state != null) {
+ verifyState(State.OPEN, State.CLOSED);
+ }
+ break;
+ case CLOSED:
+ verifyState(State.OPEN);
+ break;
+ case ARCHIVED:
+ verifyState(State.ARCHIVING);
+ break;
+ case ARCHIVING:
+ verifyState(State.CLOSED);
+ break;
+ case DELETED:
+ verifyState(State.CLOSED);
+ break;
+ }
+ }
+ this.state = targetState;
return CompletableFuture.completedFuture(Message
- .valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
+
.valueOf(LogServiceProtos.ChangeStateReplyProto.newBuilder().build().toByteString()));
}
-
-
private CompletableFuture<Message> processGetStateRequest(
LogServiceRequestProto logServiceRequestProto) {
GetStateRequestProto getState = logServiceRequestProto.getGetState();
- return
CompletableFuture.completedFuture(Message.valueOf(LogServiceProtoUtil
- .toGetStateReplyProto(state == State.OPEN).toByteString()));
+ return CompletableFuture.completedFuture(Message
+
.valueOf(LogServiceProtoUtil.toGetStateReplyProto(state).toByteString()));
}
- private Throwable verifyState(State state) {
- if (this.state != state) {
- return new IOException("Wrong state: " + this.state);
- }
+ private Throwable verifyState(State... states) {
+ for (State state : states) {
+ if (this.state == state) {
return null;
- }
+ }
+ }
+ return new IOException("Wrong state: " + this.state);
+ }
+ private CompletableFuture<Message> updateArchiveLogInfo(
+ LogServiceRequestProto logServiceRequestProto) {
+ LogServiceProtos.ArchiveLogRequestProto archiveLog =
logServiceRequestProto.getArchiveLog();
+ this.isArchivalRequest = !archiveLog.getIsExport();
+ Throwable t = null;
+ if(isArchivalRequest) {
+ archivalInfo.updateArchivalInfo(archiveLog);
+ t = verifyState(State.ARCHIVING);
+ }else{
+ t = verifyState(State.OPEN, State.CLOSED);
+ ArchivalInfo info = exportMap.get(archiveLog.getLocation());
+ if(info==null) {
+ info = new ArchivalInfo(archiveLog.getLocation());
+ exportMap.put(archiveLog.getLocation(),info);
+ }
+ info.updateArchivalInfo(archiveLog);
+ }
+ return CompletableFuture.completedFuture(
+
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
+ }
+
+ private CompletableFuture<Message> processArchiveLog(
+ LogServiceRequestProto logServiceRequestProto) {
+ LogServiceProtos.ArchiveLogRequestProto archiveLog =
logServiceRequestProto.getArchiveLog();
+ LogName logName = LogServiceProtoUtil.toLogName(archiveLog.getLogName());
+ Throwable t = null;
+ try {
+ String loc = null;
+ this.isArchivalRequest = !archiveLog.getIsExport();
+ if (isArchivalRequest) {
+ loc = archivalInfo.getArchiveLocation();
+ archivalInfo.updateArchivalInfo(archiveLog);
+ } else {
+ loc = archiveLog.getLocation();
+ ArchivalInfo exportInfo =
+ exportMap.putIfAbsent(loc, new ArchivalInfo(loc));
+ if (exportInfo != null && exportInfo.getLastArchivedIndex() ==
archiveLog
+ .getLastArchivedRaftIndex()) {
+ throw new IllegalStateException("Export of " + logName + "for the
given location " + loc
+ + "is already present and in " + exportInfo.getStatus());
+ }
+ exportInfo.updateArchivalInfo(archiveLog);
+ }
+ if (loc == null) {
+ throw new IllegalArgumentException(isArchivalRequest ?
+ "Location for archive is not configured" :
+ "Location for export provided is null");
+ }
+ final String location = loc;
+ long recordId = archiveLog.getLastArchivedRaftIndex();
+ if (isArchivalRequest) {
+ t = verifyState(State.CLOSED);
+ } else {
+ t = verifyState(State.OPEN, State.CLOSED);
+ }
+ if (t == null) {
+ Callable<Boolean> callable = () -> {
+ final Timer.Context timerContext = archiveLogTimer.time();
+ try {
+ startArchival(recordId, logName, location);
+ //Init ArchiveLogWriter for writing in export/archival location
+ ArchiveLogWriter writer = new ArchiveHdfsLogWriter();
+ writer.init(location, logName);
+
+ LogServiceRaftLogReader reader = new LogServiceRaftLogReader(log);
+ reader.seek(recordId);
+ long records = 0;
+ boolean isInterrupted = false;
+ while (reader.hasNext()) {
+ writer.write(ByteBuffer.wrap(reader.next()));
+ isInterrupted = Thread.currentThread().isInterrupted();
+ if (records >= DEFAULT_ARCHIVE_THRESHOLD_PER_FILE ||
isInterrupted) {
+ //roll writer when interuppted or no. of records threshold per
file is met
+ commit(writer, logName, location);
+ if (isInterrupted) {
+ break;
+ }
+ records = 0;
+ }
+ records++;
+ }
+ writer.close();
+ if (!isInterrupted) {
+ //It means archival is successfully completed on this leader
+ completeArchival(writer.getLastWrittenRecordId(), logName,
location);
+ } else {
+ //Thread is interuppted either leader is going down or it become
follower
+ try {
+ //Sleeping here before sending archival request to new leader
to
+ // avoid causing problem during leader election storm
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ }
+
sendArchiveLogrequestToNewLeader(writer.getLastWrittenRecordId(), logName,
location);
+ }
+ return true;
+ } catch (Exception e) {
+ LOG.error("Archival failed for the log:" + logName, e);
+ failArchival(recordId, logName, location);
+ } finally {
+ timerContext.stop();
+ }
+ return false;
+ };
+
+ archiveExportFutures.put(location, executorService.submit(callable));
+ }
+ }catch (Exception e){
+ LOG.warn("Exception while processing archival request for " + logName,
e);
+ t = e;
+ }
+ return CompletableFuture.completedFuture(
+
Message.valueOf(LogServiceProtoUtil.toArchiveLogReplyProto(t).toByteString()));
+ }
+
+ private void failArchival(long recordId, LogName logName, String location)
throws IOException {
+ updateArchivingInfo(recordId, logName, location, isArchivalRequest,
+ ArchivalStatus.FAILED);
+ if(isArchivalRequest) {
+ sendChangeStateRequest(State.CLOSED, true);
+ }
+ }
+
+ private void startArchival(long recordId, LogName logName, String location)
throws IOException {
+ if (isArchivalRequest) {
+ sendChangeStateRequest(State.ARCHIVING, false);
+ }
+ updateArchivingInfo(recordId, logName, location, isArchivalRequest,
ArchivalStatus.STARTED);
+ }
+
+ private void sendArchiveLogrequestToNewLeader(long recordId, LogName
logName, String location)
+ throws IOException {
+ getClient().sendReadOnly(() -> LogServiceProtoUtil
+ .toArchiveLogRequestProto(logName, location, recordId,
isArchivalRequest,
+ ArchivalStatus.INTERRUPTED).toByteString());
+ }
+
+ public void completeArchival(long recordId, LogName logName, String
location) throws IOException {
+ if (isArchivalRequest) {
+ sendChangeStateRequest(State.ARCHIVED, false);
+ }
+ updateArchivingInfo(recordId, logName, location, isArchivalRequest,
ArchivalStatus.COMPLETED);
+ }
+
+ private void commit(ArchiveLogWriter writer, LogName logName, String
location)
+ throws IOException {
+ writer.rollWriter();
+ updateArchivingInfo(writer.getLastWrittenRecordId(), logName, location,
isArchivalRequest,
+ ArchivalStatus.RUNNING);
+ }
+
+ private void updateArchivingInfo(long recordId, LogName logName, String
location,
+ boolean isArchival, ArchivalStatus status)
+ throws IOException {
+ RaftClientReply archiveLogReply = getClient().send(() ->
LogServiceProtoUtil
+ .toArchiveLogRequestProto(logName, location, recordId, isArchival,
status).toByteString());
+ LogServiceProtos.ArchiveLogReplyProto
message=LogServiceProtos.ArchiveLogReplyProto
+ .parseFrom(archiveLogReply.getMessage().getContent());
+ if (message.hasException()) {
+ throw new IOException(message.getException().getErrorMsg());
+ }
+ }
+
+ private void sendChangeStateRequest(State state, boolean force) throws
IOException {
+ getClient().send(
+ () ->
LogServiceProtoUtil.toChangeStateRequestProto(LogName.of("Dummy"), state, force)
+ .toByteString());
+ }
+
+ private RaftClient getClient() throws IOException {
+ if (client == null) {
+ try {
+ RaftServer raftServer = server.get();
+ client =
RaftClient.newBuilder().setRaftGroup(getGroupFromGroupId(raftServer, groupId))
+ .setClientId(ClientId.randomId())
+ .setProperties(raftServer.getProperties()).build();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ return client;
+ }
+
+ private RaftGroup getGroupFromGroupId(RaftServer raftServer, RaftGroupId
raftGroupId)
+ throws IOException {
+ List<RaftGroup> x =
StreamSupport.stream(raftServer.getGroups().spliterator(), false)
+ .filter(group ->
group.getGroupId().equals(raftGroupId)).collect(Collectors.toList());
+ if (x.size() == 1) {
+ return x.get(0);
+ } else {
+ throw new IOException(x.size() + " are group found for group id:" +
raftGroupId);
+ }
+ }
+
+ @Override public void notifyNotLeader(Collection<TransactionContext>
pendingEntries)
+ throws IOException {
+ for(Future<Boolean> archiveFuture:archiveExportFutures.values()) {
+ if (archiveFuture != null && !archiveFuture.isCancelled()) {
+ archiveFuture.cancel(true);
+ }
+ }
+ }
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index 7caa63a..5e0785b 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -42,8 +42,6 @@ import
org.apache.ratis.logservice.common.LogNotFoundException;
import org.apache.ratis.logservice.common.NoEnoughWorkersException;
import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
import org.apache.ratis.logservice.proto.MetaServiceProtos;
-import
org.apache.ratis.logservice.proto.MetaServiceProtos.ArchiveLogReplyProto;
-import
org.apache.ratis.logservice.proto.MetaServiceProtos.ArchiveLogRequestProto;
import
org.apache.ratis.logservice.proto.MetaServiceProtos.CreateLogRequestProto;
import
org.apache.ratis.logservice.proto.MetaServiceProtos.DeleteLogRequestProto;
import
org.apache.ratis.logservice.proto.MetaServiceProtos.LogServicePingRequestProto;
@@ -211,8 +209,6 @@ public class MetaStateMachine extends BaseStateMachine {
return processListLogsRequest();
case GETLOG:
return processGetLogRequest(req);
- case ARCHIVELOG:
- return processArchiveLog(req);
case DELETELOG:
return processDeleteLog(req);
default:
@@ -263,6 +259,8 @@ public class MetaStateMachine extends BaseStateMachine {
.setLogname(LogServiceProtoUtil.toLogNameProto(logName)))
.build().toByteString());
} catch (IOException e) {
+ LOG.error(
+ "Exception while unregistring raft group with Metadata
Service during deletion of log");
e.printStackTrace();
}
}
@@ -279,14 +277,6 @@ public class MetaStateMachine extends BaseStateMachine {
//
.valueOf(CloseLogReplyProto.newBuilder().build().toByteString()));
// }
- private CompletableFuture<Message>
- processArchiveLog(MetaServiceProtos.MetaServiceRequestProto
logServiceRequestProto) {
- ArchiveLogRequestProto archiveLog =
logServiceRequestProto.getArchiveLog();
- LogName logName =
LogServiceProtoUtil.toLogName(archiveLog.getLogName());
- // Handle log archiving.
- return CompletableFuture.completedFuture(Message
-
.valueOf(ArchiveLogReplyProto.newBuilder().build().toByteString()));
- }
// private CompletableFuture<Message> processGetStateRequest(
// MetaServiceProtos.MetaServiceRequestProto
logServiceRequestProto) {
@@ -345,6 +335,8 @@ public class MetaStateMachine extends BaseStateMachine {
.toRaftGroupProto(raftGroup)))
.build().toByteString());
} catch (IOException e) {
+ LOG.error(
+ "Exception while registring raft group with Metadata
Service during creation of log");
e.printStackTrace();
}
return
CompletableFuture.completedFuture(Message.valueOf(MetaServiceProtoUtil
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/RaftLogReader.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/RaftLogReader.java
new file mode 100644
index 0000000..5aa6672
--- /dev/null
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/RaftLogReader.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.server;
+
+import java.io.IOException;
+
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+public interface RaftLogReader {
+
+ /**
+ * Positions this reader just before the current recordId. Use {@link
#next()} to get that
+ * element, but take care to check if a value is present using {@link
#hasNext()} first.
+ */
+ public void seek(long recordId) throws IOException;
+
+ /**
+ * Returns true if there is a log entry to read.
+ */
+ public boolean hasNext() throws IOException;
+
+ /**
+ * Returns the next log entry. Ensure {@link #hasNext()} returns true before
+ * calling this method.
+ */
+ public byte[] next() throws IOException;
+
+ /**
+ * Returns current raft index read
+ * @return
+ */
+ public long getCurrentRaftIndex();
+
+ }
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/CommandFactory.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/CommandFactory.java
index 873cf6b..319857d 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/CommandFactory.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/CommandFactory.java
@@ -22,9 +22,11 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import org.apache.ratis.logservice.shell.commands.ArchiveLogCommand;
import org.apache.ratis.logservice.shell.commands.CreateLogCommand;
import org.apache.ratis.logservice.shell.commands.DeleteLogCommand;
import org.apache.ratis.logservice.shell.commands.ExitCommand;
+import org.apache.ratis.logservice.shell.commands.ExportLogCommand;
import org.apache.ratis.logservice.shell.commands.HelpCommand;
import org.apache.ratis.logservice.shell.commands.ListLogsCommand;
import org.apache.ratis.logservice.shell.commands.PutToLogCommand;
@@ -46,6 +48,8 @@ public class CommandFactory {
commands.put("quit", exitCommand);
commands.put("help", new HelpCommand());
commands.put("list", new ListLogsCommand());
+ commands.put("archive", new ArchiveLogCommand());
+ commands.put("export", new ExportLogCommand());
return Collections.unmodifiableMap(commands);
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/commands/ArchiveLogCommand.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/commands/ArchiveLogCommand.java
new file mode 100644
index 0000000..11e60b4
--- /dev/null
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/commands/ArchiveLogCommand.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.shell.commands;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.client.LogServiceClient;
+import org.apache.ratis.logservice.shell.Command;
+import org.jline.reader.LineReader;
+import org.jline.terminal.Terminal;
+
+public class ArchiveLogCommand implements Command {
+
+ @Override public String getHelpMessage() {
+ return "`archive` - archive the given log at already configured location";
+ }
+
+ @Override
+ public void run(Terminal terminal, LineReader lineReader, LogServiceClient
client, String[] args) {
+ if (args.length != 1) {
+ terminal.writer().println("ERROR - Usage: archive <name> ");
+ return;
+ }
+ String logName = args[0];
+ try {
+ client.archiveLog(LogName.of(logName));
+ terminal.writer().println("Archive Log request is submitted
successfully!!");
+ } catch (Exception e) {
+ terminal.writer().println("Failed to archive log!!");
+ e.printStackTrace(terminal.writer());
+ }
+ }
+}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/commands/ExportLogCommand.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/commands/ExportLogCommand.java
new file mode 100644
index 0000000..11c1df7
--- /dev/null
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/shell/commands/ExportLogCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.logservice.shell.commands;
+
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.client.LogServiceClient;
+import org.apache.ratis.logservice.shell.Command;
+import org.jline.reader.LineReader;
+import org.jline.terminal.Terminal;
+
+public class ExportLogCommand implements Command {
+
+ @Override public String getHelpMessage() {
+ return "`export` - export the given log at given location starting from
provided recordId";
+ }
+
+ @Override
+ public void run(Terminal terminal, LineReader lineReader, LogServiceClient
client, String[] args) {
+ if (args.length != 3) {
+ terminal.writer().println("ERROR - Usage: export <name> <location>
<recordId>");
+ return;
+ }
+ String logName = args[0];
+ String location = args[1];
+ long recordId = Long.parseLong(args[2]);
+ try {
+ client.exportLog(LogName.of(logName), location,recordId);
+ terminal.writer().println("Export Log request is submitted
successfully!!");
+ } catch (Exception e) {
+ terminal.writer().println("Failed to export log!!");
+ e.printStackTrace(terminal.writer());
+ }
+ }
+}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
index 2cc4c58..8b8356e 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
@@ -29,7 +29,7 @@ import org.apache.ratis.logservice.api.LogStream;
import org.apache.ratis.logservice.api.LogStream.State;
import org.apache.ratis.logservice.proto.LogServiceProtos;
import org.apache.ratis.logservice.proto.LogServiceProtos.*;
-import org.apache.ratis.logservice.proto.MetaServiceProtos.*;
+import org.apache.ratis.logservice.server.ArchivalInfo;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
public class LogServiceProtoUtil {
@@ -56,11 +56,17 @@ public class LogServiceProtoUtil {
return logStreamProto;
}
- public static LogServiceRequestProto toCloseLogRequestProto(LogName logName)
{
+ public static LogServiceRequestProto toChangeStateRequestProto(LogName
logName, State state,
+ boolean force) {
LogNameProto logNameProto =
LogNameProto.newBuilder().setName(logName.getName()).build();
- CloseLogRequestProto closeLog =
- CloseLogRequestProto.newBuilder().setLogName(logNameProto).build();
- return LogServiceRequestProto.newBuilder().setCloseLog(closeLog).build();
+ ChangeStateLogRequestProto changeLog =
+ ChangeStateLogRequestProto.newBuilder().setLogName(logNameProto)
+ .setState(LogStreamState.valueOf(state.name())).build();
+ return
LogServiceRequestProto.newBuilder().setChangeState(changeLog).build();
+ }
+
+ public static LogServiceRequestProto toChangeStateRequestProto(LogName
logName, State state) {
+ return toChangeStateRequestProto(logName, state, false);
}
public static LogServiceRequestProto toGetStateRequestProto(LogName logName)
{
@@ -70,8 +76,11 @@ public class LogServiceProtoUtil {
return LogServiceRequestProto.newBuilder().setGetState(getState).build();
}
- public static ArchiveLogReplyProto toArchiveLogReplyProto() {
+ public static ArchiveLogReplyProto toArchiveLogReplyProto(Throwable t) {
ArchiveLogReplyProto.Builder builder = ArchiveLogReplyProto.newBuilder();
+ if (t != null) {
+ builder.setException(toLogException(t));
+ }
return builder.build();
}
@@ -163,11 +172,10 @@ public class LogServiceProtoUtil {
return retVal;
}
- public static GetStateReplyProto toGetStateReplyProto(boolean exists) {
- return GetStateReplyProto.newBuilder().build();
+ public static GetStateReplyProto toGetStateReplyProto(State state) {
+ return
GetStateReplyProto.newBuilder().setState(LogStreamState.valueOf(state.name())).build();
}
-
public static GetLogLengthReplyProto toGetLogLengthReplyProto(long length,
Throwable t) {
GetLogLengthReplyProto.Builder builder =
GetLogLengthReplyProto.newBuilder();
if (t != null) {
@@ -245,6 +253,19 @@ public class LogServiceProtoUtil {
return builder.build();
}
+ public static ArchiveLogRequestProto toExportInfoProto(ArchivalInfo info) {
+ return ArchiveLogRequestProto.newBuilder().setIsExport(true)
+ .setLastArchivedRaftIndex(info.getLastArchivedIndex())
+ .setLocation(info.getArchiveLocation()).setLogName(
+
LogServiceProtos.LogNameProto.newBuilder().setName(info.getArchiveLogName().getName())
+
.build()).setStatus(ArchivalStatus.valueOf(info.getStatus().name())).build();
+ }
+
+ public static ArchivalInfo toExportInfo(ArchiveLogRequestProto proto){
+ return new ArchivalInfo(proto.getLocation()).updateArchivalInfo(proto);
+
+ }
+
public GetLogLengthReplyProto toGetLogLengthReplyProto(long length) {
GetLogLengthReplyProto.Builder builder =
GetLogLengthReplyProto.newBuilder();
builder.setLength(length);
@@ -265,4 +286,26 @@ public class LogServiceProtoUtil {
return builder.build();
}
+ public static LogServiceRequestProto toExportInfoRequestProto(LogName
logName){
+ LogServiceProtos.LogNameProto logNameProto =
+
LogServiceProtos.LogNameProto.newBuilder().setName(logName.getName()).build();
+ GetExportInfoRequestProto exportInfoRequestProto =
+
GetExportInfoRequestProto.newBuilder().setLogName(logNameProto).build();
+ return
LogServiceRequestProto.newBuilder().setExportInfo(exportInfoRequestProto).build();
+ }
+
+ public static LogServiceRequestProto toArchiveLogRequestProto(LogName
logName, String location,
+ long raftIndex, boolean isArchival, ArchivalInfo.ArchivalStatus status) {
+ LogServiceProtos.LogNameProto logNameProto =
+
LogServiceProtos.LogNameProto.newBuilder().setName(logName.getName()).build();
+ ArchiveLogRequestProto.Builder builder =
+ ArchiveLogRequestProto.newBuilder().setLogName(logNameProto)
+
.setLastArchivedRaftIndex(raftIndex).setStatus(ArchivalStatus.valueOf(status.name()));
+ builder.setIsExport(!isArchival);
+ if (location != null) {
+ builder.setLocation(location);
+ }
+ ArchiveLogRequestProto archiveLog = builder.build();
+ return
LogServiceRequestProto.newBuilder().setArchiveLog(archiveLog).build();
+ }
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
index c44853f..f5c6d01 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java
@@ -18,6 +18,8 @@
package org.apache.ratis.logservice.util;
+import org.apache.hadoop.fs.Path;
+import org.apache.ratis.logservice.api.LogName;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
@@ -50,4 +52,22 @@ public class LogServiceUtils {
}
}
+
+ public static String getArchiveLocationForLog(String location, LogName
logName) {
+ return location + "/" + logName.getName();
+ }
+
+ public static String getRolledPathForArchiveWriter(Path path, long
lastWrittenId) {
+ return path + "_recordId_" + lastWrittenId;
+ }
+
+ public static Integer getRecordIdFromRolledArchiveFile(Path path) {
+ String[] splits = path.getName().toString().split("_recordId_");
+ if (splits.length != 2) {
+ //currently written file, should be read last
+ return Integer.MAX_VALUE;
+ }
+ return Integer.parseInt(splits[1]);
+ }
+
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
index 82f530e..90227b1 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/MetaServiceProtoUtil.java
@@ -101,15 +101,6 @@ public class MetaServiceProtoUtil {
return MetaServiceRequestProto.newBuilder().setGetLog(getLog).build();
}
- public static MetaServiceRequestProto toArchiveLogRequestProto(LogName
logName) {
- LogServiceProtos.LogNameProto logNameProto =
LogServiceProtos.LogNameProto.newBuilder()
- .setName(logName.getName())
- .build();
- ArchiveLogRequestProto archiveLog =
-
ArchiveLogRequestProto.newBuilder().setLogName(logNameProto).build();
- return
MetaServiceRequestProto.newBuilder().setArchiveLog(archiveLog).build();
- }
-
public static MetaServiceRequestProto toDeleteLogRequestProto(LogName
logName) {
LogServiceProtos.LogNameProto logNameProto =
LogServiceProtos.LogNameProto.newBuilder()
.setName(logName.getName())
diff --git a/ratis-logservice/src/main/proto/LogService.proto
b/ratis-logservice/src/main/proto/LogService.proto
index 2dc4a32..5c06f95 100644
--- a/ratis-logservice/src/main/proto/LogService.proto
+++ b/ratis-logservice/src/main/proto/LogService.proto
@@ -35,17 +35,32 @@ message LogStreamProto {
enum LogStreamState {
OPEN = 0;
CLOSED = 1;
+ ARCHIVING = 2;
+ ARCHIVED = 3;
+ DELETED = 4;
}
-message CloseLogRequestProto {
+enum ArchivalStatus {
+ SUBMITTED = 0;
+ STARTED = 1;
+ RUNNING = 2;
+ INTERRUPTED = 3;
+ COMPLETED = 4;
+ FAILED = 5;
+}
+
+message ChangeStateLogRequestProto {
LogNameProto logName = 1;
+ LogStreamState state = 2;
+ LogServiceException exception = 3;
+ bool force = 4;
}
message GetStateRequestProto {
LogNameProto logName = 1;
}
-message CloseLogReplyProto {
+message ChangeStateReplyProto {
}
message GetStateReplyProto {
@@ -143,9 +158,30 @@ message GetLogLastCommittedIndexReplyProto {
LogServiceException exception = 2;
}
+message ArchiveLogRequestProto {
+ LogNameProto logName = 1;
+ string location = 2;
+ uint64 lastArchivedRaftIndex = 3;
+ bool isExport = 4;
+ ArchivalStatus status = 5;
+}
+
+message ArchiveLogReplyProto {
+ LogServiceException exception = 1;
+}
+
+message GetExportInfoRequestProto {
+ LogNameProto logName = 1;
+}
+
+message GetExportInfoReplyProto {
+ repeated ArchiveLogRequestProto info = 1;
+ LogServiceException exception = 2;
+}
+
message LogServiceRequestProto {
oneof Request {
- CloseLogRequestProto closeLog = 1;
+ ChangeStateLogRequestProto changeState = 1;
GetStateRequestProto getState = 2;
ReadLogRequestProto readNextQuery = 3;
GetLogLengthRequestProto lengthQuery = 4;
@@ -154,6 +190,8 @@ message LogServiceRequestProto {
SyncLogRequestProto syncRequest = 7;
GetLogLastCommittedIndexRequestProto lastIndexQuery = 8;
GetLogSizeRequestProto sizeRequest = 9;
+ ArchiveLogRequestProto archiveLog = 10;
+ GetExportInfoRequestProto exportInfo= 11;
}
}
diff --git a/ratis-logservice/src/main/proto/MetaService.proto
b/ratis-logservice/src/main/proto/MetaService.proto
index 7c94d1b..16e0232 100644
--- a/ratis-logservice/src/main/proto/MetaService.proto
+++ b/ratis-logservice/src/main/proto/MetaService.proto
@@ -54,10 +54,6 @@ message GetLogRequestProto {
LogNameProto logName = 1;
}
-message ArchiveLogRequestProto {
- LogNameProto logName = 1;
-}
-
message DeleteLogRequestProto {
LogNameProto logName = 1;
}
@@ -78,10 +74,6 @@ message ListLogsReplyProto {
}
-message ArchiveLogReplyProto {
- MetaServiceExceptionProto exception = 1;
-}
-
message DeleteLogReplyProto {
MetaServiceExceptionProto exception = 1;
}
diff --git
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
index 925ebac..090e1eb 100644
---
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
+++
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
@@ -84,7 +84,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends
MiniRaftCluster>
return super.getStartRecordId();
}
- @Override public State getState() {
+ @Override public State getState() throws IOException {
getStateCount++;
return super.getState();
}
@@ -110,7 +110,8 @@ public abstract class LogServiceReadWriteBase<CLUSTER
extends MiniRaftCluster>
}
@Before
public void setUpCluster() throws IOException, InterruptedException {
- cluster = newCluster(NUM_PEERS);
+ RaftProperties raftProperties = getProperties();
+ cluster = getFactory().newCluster(NUM_PEERS, raftProperties);
cluster.start();
RaftTestUtil.waitForLeader(cluster);
}
@@ -166,6 +167,8 @@ public abstract class LogServiceReadWriteBase<CLUSTER
extends MiniRaftCluster>
assertEquals(expected, actual);
}
testJMXMetrics(logStream);
+ assertEquals(logStream.getState(),State.OPEN);
+
}
}
diff --git
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/impl/TestArchiveHdfsLogReaderAndWriter.java
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/impl/TestArchiveHdfsLogReaderAndWriter.java
new file mode 100644
index 0000000..4213ac8
--- /dev/null
+++
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/impl/TestArchiveHdfsLogReaderAndWriter.java
@@ -0,0 +1,153 @@
+package org.apache.ratis.logservice.impl;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import com.google.common.primitives.Bytes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.ratis.logservice.api.ArchiveLogReader;
+import org.apache.ratis.logservice.api.ArchiveLogWriter;
+import org.apache.ratis.logservice.api.LogName;
+import org.apache.ratis.logservice.util.LogServiceUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestArchiveHdfsLogReaderAndWriter {
+ static MiniDFSCluster cluster;
+ static Configuration conf;
+ private static String location;
+
+ @BeforeClass public static void setup() throws IOException {
+ conf = new Configuration();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ location = "target/tmp/archive/TestArchiveHdfsLogReaderAndWriter";
+ }
+
+ @Test public void testRollingWriter() throws IOException {
+ String archiveLocation = location+"/testRollingWriter";
+ LogName logName = LogName.of("testRollingWriterLogName");
+ DistributedFileSystem fs = cluster.getFileSystem();
+ fs.delete(new Path(archiveLocation), true);
+ ArchiveLogWriter writer = new ArchiveHdfsLogWriter(conf);
+ writer.init(archiveLocation, logName);
+ int k = 2;
+ write(writer, 1, k);
+ Assert.assertEquals(writer.getLastWrittenRecordId(), k);
+ writer.rollWriter();
+ String[] files = Arrays.stream(
+ fs.listStatus(new
Path(LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName))))
+ .map(fileStatus ->
fileStatus.getPath().getName()).toArray(String[]::new);
+ String[] expectedFiles = { logName.getName(), logName.getName() +
"_recordId_" + k };
+ Assert.assertArrayEquals(expectedFiles, files);
+ ArchiveLogReader reader = new ArchiveHdfsLogReader(conf,
+ LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName));
+ verifyRecords(reader, k);
+ Assert.assertEquals(writer.getLastWrittenRecordId(), reader.getPosition());
+ write(writer, k + 1, 2 * k);
+ Assert.assertEquals(writer.getLastWrittenRecordId(), 2 * k);
+ writer.close();
+ reader = new ArchiveHdfsLogReader(conf,
+ LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName));
+ verifyRecords(reader, 2 * k);
+
+ files = ((ArchiveHdfsLogReader) reader).getFiles().stream()
+ .map(fileStatus ->
fileStatus.getPath().getName()).toArray(String[]::new);
+ String[] expectedFiles1 =
+ { logName.getName() + "_recordId_" + k, logName.getName() +
"_recordId_" + 2 * k };
+ Assert.assertArrayEquals(expectedFiles1, files);
+ reader.close();
+ }
+
+ private void verifyRecords(ArchiveLogReader reader, int n) throws
IOException {
+ for (int i = 1; i <= n; i++) {
+ Assert.assertEquals(i, ByteBuffer.wrap(reader.next()).getInt());
+ }
+ Assert.assertFalse(reader.hasNext());
+ Assert.assertTrue(reader.next() == null);
+ try {
+ reader.readNext();
+ Assert.fail();
+ } catch (NoSuchElementException e) {
+ //expected
+ }
+ }
+
+ private void write(ArchiveLogWriter writer, int start, int end) throws
IOException {
+ for (Integer i = start; i <= end; i++) {
+ writer.write(ByteBuffer.allocate(4).putInt(i));
+ }
+ }
+
+ @Test public void testCorruptedFileEOF() throws IOException {
+ FSDataOutputStream fos = FileSystem.get(conf).create(new
Path(location,"testEOF"));
+ fos.write(ByteBuffer.allocate(4).putInt(4).array());
+ fos.write(new byte[4]);
+ fos.write(ByteBuffer.allocate(4).putInt(4).array());
+ // but data is just 2 bytes
+ fos.write(new byte[2]);
+ fos.close();
+ ArchiveLogReader reader = new ArchiveHdfsLogReader(conf,
location+"/testEOF");
+ try {
+ reader.next();
+ Assert.fail();
+ } catch (EOFException e) {
+ //expected
+ }
+
+ }
+
+ @Test public void testSeek() throws IOException {
+ String archiveLocation = location+"/testSeek";
+ LogName logName = LogName.of("testSeek");
+ DistributedFileSystem fs = cluster.getFileSystem();
+ fs.delete(new Path(archiveLocation), true);
+ ArchiveLogWriter writer = new ArchiveHdfsLogWriter(conf);
+ writer.init(archiveLocation, logName);
+ int k = 100;
+ write(writer, 1, k);
+ writer.close();
+ ArchiveLogReader reader = new ArchiveHdfsLogReader(conf,
+ LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName));
+ reader.seek(80);
+ Assert.assertEquals(80, reader.getPosition());
+ int count = 0;
+ while (reader.next() != null) {
+ count++;
+ }
+ Assert.assertEquals(20, count);
+ }
+
+ @AfterClass
+ public static void teardownafterclass(){
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ deleteLocalDirectory(new File(location));
+ }
+
+ static boolean deleteLocalDirectory(File dir) {
+ File[] allFiles = dir.listFiles();
+ if (allFiles != null) {
+ for (File file : allFiles) {
+ deleteLocalDirectory(file);
+ }
+ }
+ return dir.delete();
+ }
+
+}
diff --git
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
index 685cf9b..886f058 100644
---
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
+++
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
@@ -19,13 +19,14 @@
package org.apache.ratis.logservice.server;
import org.apache.ratis.logservice.api.*;
+import org.apache.ratis.logservice.api.LogStream.State;
import org.apache.ratis.logservice.client.LogServiceClient;
-import org.apache.ratis.logservice.common.Constants;
import org.apache.ratis.logservice.common.LogAlreadyExistException;
import org.apache.ratis.logservice.common.LogNotFoundException;
import org.apache.ratis.logservice.metrics.LogServiceMetricsRegistry;
import org.apache.ratis.logservice.proto.MetaServiceProtos;
import org.apache.ratis.logservice.util.LogServiceCluster;
+import org.apache.ratis.logservice.util.TestUtils;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.junit.AfterClass;
@@ -33,11 +34,11 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.List;
-import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
@@ -70,6 +71,7 @@ public class TestMetaServer {
listCount.incrementAndGet();
return super.listLogs();
}
+
};
@BeforeClass
public static void beforeClass() {
@@ -92,12 +94,10 @@ public class TestMetaServer {
*/
@Test
public void testCreateAndGetLog() throws Exception {
-
// This should be LogServiceStream ?
LogStream logStream1 = client.createLog(LogName.of("testCreateLog"));
assertNotNull(logStream1);
LogStream logStream2 = client.getLog(LogName.of("testCreateLog"));
-
testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.GETLOG.name(),1l);
assertNotNull(logStream2);
}
@@ -126,6 +126,98 @@ public class TestMetaServer {
assert(res.array().length > 0);
}
+ @Test
+ public void testLogArchival() throws IOException, InterruptedException {
+ LogName logName = LogName.of("testArchivalLog");
+ LogStream logStream = client.createLog(logName);
+ LogWriter writer = logStream.createWriter();
+ List<LogInfo> listLogs = client.listLogs();
+ assert (listLogs.stream()
+ .filter(log ->
log.getLogName().getName().startsWith(logName.getName())).count() == 1);
+ List<LogServer> workers = cluster.getWorkers();
+ List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
+ writer.write(records);
+ client.closeLog(logName);
+ assertEquals(logStream.getState(), State.CLOSED);
+ client.archiveLog(logName);
+ int retry = 0;
+ while (logStream.getState() != State.ARCHIVED && retry <= 40) {
+ Thread.sleep(1000);
+ retry++;
+ }
+ assertEquals(logStream.getState(), State.ARCHIVED);
+ LogReader reader = logStream.createReader();
+ List<ByteBuffer> data = reader.readBulk(records.size());
+ assertEquals(records.size(), data.size());
+ reader.seek(1);
+ data = reader.readBulk(records.size());
+ assertEquals(records.size() - 1, data.size());
+
+ //Test ArchiveLogStream
+ LogServiceConfiguration config = LogServiceConfiguration.create();
+ LogStream archiveLogStream = client.getArchivedLog(logName);
+ reader = archiveLogStream.createReader();
+ data = reader.readBulk(records.size());
+ assertEquals(records.size(), data.size());
+ }
+
+ @Test
+ public void testLogExport() throws IOException, InterruptedException {
+ LogName logName = LogName.of("testLogExport");
+ LogStream logStream = client.createLog(logName);
+ LogWriter writer = logStream.createWriter();
+ List<LogInfo> listLogs = client.listLogs();
+ assert (listLogs.stream()
+ .filter(log ->
log.getLogName().getName().startsWith(logName.getName())).count() == 1);
+ List<LogServer> workers = cluster.getWorkers();
+ List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
+ writer.write(records);
+ String location1 = "target/tmp/export_1/";
+ String location2 = "target/tmp/export_2/";
+ deleteLocalDirectory(new File(location1));
+ deleteLocalDirectory(new File(location2));
+ int startPosition1 = 3;
+ int startPosition2 = 5;
+ client.exportLog(logName, location1, startPosition1);
+ client.exportLog(logName, location2, startPosition2);
+ List<ArchivalInfo> infos=client.getExportStatus(logName);
+ int count=0;
+ while (infos.size() > 1 && (
+ infos.get(0).getStatus() != ArchivalInfo.ArchivalStatus.COMPLETED
+ || infos.get(1).getStatus() !=
ArchivalInfo.ArchivalStatus.COMPLETED)
+ && count < 10) {
+ infos = client.getExportStatus(logName);
+ ;
+ Thread.sleep(1000);
+ count++;
+
+ }
+
+ //Test ExportLogStream
+ LogStream exportLogStream = client.getExportLog(logName, location1);
+ LogReader reader = exportLogStream.createReader();
+ List<ByteBuffer> data = reader.readBulk(records.size());
+ assertEquals(records.size() - startPosition1, data.size());
+ reader.close();
+ exportLogStream = client.getExportLog(logName, location2);
+ reader = exportLogStream.createReader();
+ data = reader.readBulk(records.size());
+ assertEquals(records.size() - startPosition2, data.size());
+ reader.close();
+ writer.close();
+ }
+
+ boolean deleteLocalDirectory(File dir) {
+ File[] allFiles = dir.listFiles();
+ if (allFiles != null) {
+ for (File file : allFiles) {
+ deleteLocalDirectory(file);
+ }
+ }
+ return dir.delete();
+ }
+
+
/**
* Test for Delete operation
* @throws IOException
diff --git
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/TestLogServiceProtoUtil.java
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/TestLogServiceProtoUtil.java
index 4f0c658..3187868 100644
---
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/TestLogServiceProtoUtil.java
+++
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/TestLogServiceProtoUtil.java
@@ -165,7 +165,7 @@ public class TestLogServiceProtoUtil {
@Ignore
public void testGetStateReply() {
LogStream logStream = null;
- GetStateReplyProto proto = LogServiceProtoUtil.toGetStateReplyProto(true);
+ GetStateReplyProto proto =
LogServiceProtoUtil.toGetStateReplyProto(LogStream.State.OPEN);
//TODO finish
}
@@ -175,8 +175,9 @@ public class TestLogServiceProtoUtil {
@Test
public void testCloseLogRequest() {
LogName name = LogName.of("test");
- LogServiceRequestProto proto =
LogServiceProtoUtil.toCloseLogRequestProto(name);
- CloseLogRequestProto request = proto.getCloseLog();
+ LogServiceRequestProto proto =
LogServiceProtoUtil.toChangeStateRequestProto(name,
+ LogStream.State.CLOSED);
+ ChangeStateLogRequestProto request = proto.getChangeState();
assertEquals(name.getName(), request.getLogName().getName());
//TODO finish
}
diff --git a/ratis-logservice/src/test/resources/logservice.xml
b/ratis-logservice/src/test/resources/logservice.xml
new file mode 100644
index 0000000..a509774
--- /dev/null
+++ b/ratis-logservice/src/test/resources/logservice.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+ <property>
+ <name>logservice.archival.location</name>
+ <value>target/tmp/archive_1/</value>
+ <description>Base directory where logs are archived</description>
+ </property>
+</configuration>
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 705876e..bc8961e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -45,8 +45,8 @@ import java.util.concurrent.atomic.AtomicReference;
* Base implementation for StateMachines.
*/
public class BaseStateMachine implements StateMachine {
- private final CompletableFuture<RaftServer> server = new
CompletableFuture<>();
- private volatile RaftGroupId groupId;
+ protected final CompletableFuture<RaftServer> server = new
CompletableFuture<>();
+ protected volatile RaftGroupId groupId;
protected final LifeCycle lifeCycle = new
LifeCycle(getClass().getSimpleName());
private final AtomicReference<TermIndex> lastAppliedTermIndex = new
AtomicReference<>();