http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java index 2262437..5733a60 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java @@ -18,91 +18,12 @@ */ package org.apache.asterix.common.replication; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.asterix.common.config.MetadataProperties; -import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; -import org.apache.hyracks.api.config.IConfigManager; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.control.common.config.ConfigManager; -import org.apache.hyracks.control.common.controllers.NCConfig; public class MetadataOnlyReplicationStrategy implements IReplicationStrategy { - private String metadataPrimaryReplicaId; - private Replica metadataPrimaryReplica; - private Set<Replica> metadataNodeReplicas; - MetadataProperties metadataProperties; - @Override public boolean isMatch(int datasetId) { return datasetId < MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID && datasetId >= 0; } - - @Override - public Set<Replica> getRemoteReplicas(String nodeId) { - if (nodeId.equals(metadataPrimaryReplicaId)) { - return metadataNodeReplicas; - } - return Collections.emptySet(); - } - - @Override - public Set<Replica> getRemoteReplicasAndSelf(String nodeId) { - - if (nodeId.equals(metadataPrimaryReplicaId)) { - Set<Replica> replicasAndSelf = new HashSet<>(); - replicasAndSelf.addAll(metadataNodeReplicas); - replicasAndSelf.add(metadataPrimaryReplica); - return replicasAndSelf; - } - return Collections.emptySet(); - } - - @Override - public Set<Replica> getRemotePrimaryReplicas(String nodeId) { - if (metadataNodeReplicas.stream().map(Replica::getId).filter(replicaId -> replicaId.equals(nodeId)) - .count() != 0) { - return new HashSet<>(Arrays.asList(metadataPrimaryReplica)); - } - return Collections.emptySet(); - } - - @Override - public MetadataOnlyReplicationStrategy from(ReplicationProperties p, IConfigManager configManager) - throws HyracksDataException { - MetadataOnlyReplicationStrategy st = new MetadataOnlyReplicationStrategy(); - st.metadataProperties = p.getMetadataProperties(); - st.metadataPrimaryReplicaId = st.metadataProperties.getMetadataNodeName(); - st.metadataPrimaryReplica = new Replica(st.metadataPrimaryReplicaId, - ((ConfigManager) configManager).getNodeEffectiveConfig(st.metadataPrimaryReplicaId) - .getString(NCConfig.Option.REPLICATION_LISTEN_ADDRESS), - ((ConfigManager) configManager).getNodeEffectiveConfig(st.metadataPrimaryReplicaId) - .getInt(NCConfig.Option.REPLICATION_LISTEN_PORT)); - final Set<Replica> replicas = new HashSet<>(); - Set<String> candidateSet = new HashSet<>(); - candidateSet.addAll(((ConfigManager) (configManager)).getNodeNames()); - candidateSet.remove(st.metadataPrimaryReplicaId); - String[] candidateAry = new String[candidateSet.size()]; - candidateSet.toArray(candidateAry); - for (int i = 0; i < candidateAry.length && i < p.getReplicationFactor(); i++) { - replicas.add(new Replica(candidateAry[i], - ((ConfigManager) configManager).getNodeEffectiveConfig(candidateAry[i]) - .getString(NCConfig.Option.REPLICATION_LISTEN_ADDRESS), - ((ConfigManager) configManager).getNodeEffectiveConfig(candidateAry[i]) - .getInt(NCConfig.Option.REPLICATION_LISTEN_PORT))); - } - st.metadataNodeReplicas = replicas; - return st; - } - - @Override - public boolean isParticipant(String nodeId) { - return true; - } }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java index c32ee3c..c3d9ced 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java @@ -18,42 +18,10 @@ */ package org.apache.asterix.common.replication; -import java.util.Collections; -import java.util.Set; - -import org.apache.asterix.common.config.ReplicationProperties; -import org.apache.hyracks.api.config.IConfigManager; -import org.apache.hyracks.control.common.config.ConfigManager; -import org.apache.hyracks.control.common.controllers.NCConfig; - public class NoReplicationStrategy implements IReplicationStrategy { @Override public boolean isMatch(int datasetId) { return false; } - - @Override - public boolean isParticipant(String nodeId) { - return false; - } - - @Override - public Set<Replica> getRemotePrimaryReplicas(String nodeId) { - return Collections.emptySet(); - } - - @Override - public Set<Replica> getRemoteReplicas(String node) { - return Collections.emptySet(); - } - - public Set<Replica> getRemoteReplicasAndSelf(String nodeId) { - return Collections.emptySet(); - } - - @Override - public NoReplicationStrategy from(ReplicationProperties p, IConfigManager configManager) { - return new NoReplicationStrategy(); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java deleted file mode 100644 index 52dbd25..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.replication; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -public class Replica { - - public enum ReplicaState { - ACTIVE, - DEAD, - UNKNOWN - } - - private ReplicaState state = ReplicaState.UNKNOWN; - String nodeId; - String ipAddr; - int port; - - public Replica(String id, String ip, int port) { - nodeId = id; - ipAddr = ip; - this.port = port; - } - - public ReplicaState getState() { - return state; - } - - public void setState(ReplicaState state) { - this.state = state; - } - - public static Replica create(DataInput input) throws IOException { - Replica replica = new Replica(null, null, -1); - replica.readFields(input); - return replica; - } - - public String getId() { - return nodeId; - } - - public void writeFields(DataOutput output) throws IOException { - output.writeUTF(nodeId); - output.writeUTF(ipAddr); - output.writeInt(port); - output.writeInt(state.ordinal()); - } - - public void readFields(DataInput input) throws IOException { - this.nodeId = input.readUTF(); - this.ipAddr = input.readUTF(); - this.port = input.readInt(); - this.state = ReplicaState.values()[input.readInt()]; - } - - public String getClusterIp() { - return ipAddr; - } - - public void setClusterIp(String ip) { - ipAddr = ip; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public void serialize(OutputStream out) throws IOException { - DataOutputStream dos = new DataOutputStream(out); - writeFields(dos); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof Replica)) { - return false; - } - Replica other = (Replica) o; - return nodeId.equals(other.getId()); - } - - @Override - public int hashCode() { - return nodeId.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java deleted file mode 100644 index ae02ca9..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.replication; - -import java.io.DataInput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; - -public class ReplicaEvent { - - Replica replica; - ClusterEventType eventType; - - public ReplicaEvent(Replica replica, ClusterEventType eventType) { - this.replica = replica; - this.eventType = eventType; - } - - public Replica getReplica() { - return replica; - } - - public void setReplica(Replica replica) { - this.replica = replica; - } - - public ClusterEventType getEventType() { - return eventType; - } - - public void setEventType(ClusterEventType eventType) { - this.eventType = eventType; - } - - public void serialize(OutputStream out) throws IOException { - DataOutputStream dos = new DataOutputStream(out); - replica.writeFields(dos); - dos.writeInt(eventType.ordinal()); - } - - public static ReplicaEvent create(DataInput input) throws IOException { - Replica replica = Replica.create(input); - ClusterEventType eventType = ClusterEventType.values()[input.readInt()]; - ReplicaEvent event = new ReplicaEvent(replica, eventType); - return event; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java index e6b6445..8d51a99 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java @@ -21,39 +21,31 @@ package org.apache.asterix.common.replication; import java.util.HashMap; import java.util.Map; -import org.apache.asterix.common.config.ReplicationProperties; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.hyracks.api.config.IConfigManager; -import org.apache.hyracks.api.exceptions.HyracksDataException; - public class ReplicationStrategyFactory { private static final Map<String, Class<? extends IReplicationStrategy>> BUILT_IN_REPLICATION_STRATEGY = new HashMap<>(); static { - BUILT_IN_REPLICATION_STRATEGY.put("no_replication", NoReplicationStrategy.class); - BUILT_IN_REPLICATION_STRATEGY.put("chained_declustering", ChainedDeclusteringReplicationStrategy.class); - BUILT_IN_REPLICATION_STRATEGY.put("metadata_only", MetadataOnlyReplicationStrategy.class); + BUILT_IN_REPLICATION_STRATEGY.put("none", NoReplicationStrategy.class); + BUILT_IN_REPLICATION_STRATEGY.put("all", AllDatasetsReplicationStrategy.class); + BUILT_IN_REPLICATION_STRATEGY.put("metadata", MetadataOnlyReplicationStrategy.class); } private ReplicationStrategyFactory() { throw new AssertionError(); } - public static IReplicationStrategy create(String name, ReplicationProperties repProp, IConfigManager ncConfig) - throws HyracksDataException { + public static IReplicationStrategy create(String name) { String strategyName = name.toLowerCase(); if (!BUILT_IN_REPLICATION_STRATEGY.containsKey(strategyName)) { - throw new RuntimeDataException(ErrorCode.UNSUPPORTED_REPLICATION_STRATEGY, String.format( - "%s. Available strategies: %s", strategyName, BUILT_IN_REPLICATION_STRATEGY.keySet().toString())); + throw new IllegalStateException("Couldn't find strategy with name: " + name); } Class<? extends IReplicationStrategy> clazz = BUILT_IN_REPLICATION_STRATEGY.get(strategyName); try { - return clazz.newInstance().from(repProp, ncConfig); + return clazz.newInstance(); } catch (InstantiationException | IllegalAccessException e) { - throw new RuntimeDataException(ErrorCode.INSTANTIATION_ERROR, e, clazz.getName()); + throw new IllegalStateException("Couldn't instantiated replication strategy: " + name, e); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java index 8cc11a8..c7de7a3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java @@ -91,6 +91,10 @@ public class ResourceReference { ref.root = tokens[--offset]; } + public int getPartitionNum() { + return Integer.parseInt(partition.substring(StorageConstants.PARTITION_DIR_PREFIX.length())); + } + protected static void parseLegacyPath(ResourceReference ref, String path) { // old format: root/partition/dataverse/datasetName_idx_IndexName/fileName final String[] tokens = path.split(File.separator); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java index 4090b65..3274849 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java @@ -20,7 +20,6 @@ package org.apache.asterix.common.transactions; import java.nio.ByteBuffer; -import org.apache.asterix.common.replication.IReplicationThread; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; public interface ILogRecord { @@ -133,8 +132,6 @@ public interface ILogRecord { public void readRemoteLog(ByteBuffer buffer); - public void setReplicationThread(IReplicationThread replicationThread); - public void setLogSource(byte logSource); public byte getLogSource(); @@ -150,7 +147,7 @@ public interface ILogRecord { public void setReplicated(boolean replicated); /** - * @return a flag indicating whether the log record should be sent to remote replicas + * @return a flag indicating whether the log was replicated */ public boolean isReplicated(); @@ -169,4 +166,18 @@ public interface ILogRecord { public void logAppended(long lsn); public long getPreviousMarkerLSN(); + + /** + * Sets flag indicating if this log should be replicated or not + * + * @param replicate + */ + void setReplicate(boolean replicate); + + /** + * Gets a flag indicating if this log should be replicated or not + * + * @return the flag + */ + boolean isReplicate(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRequester.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRequester.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRequester.java new file mode 100644 index 0000000..8e874a2 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRequester.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.transactions; + +public interface ILogRequester { + + /** + * Sends a notification to the log requester that {@code logRecord} has been flushed. + * + * @param logRecord The log that has been flushed. + */ + void notifyFlushed(ILogRecord logRecord); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java index dea7a67..bfe7963 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java @@ -119,4 +119,15 @@ public interface IRecoveryManager { * @throws ACIDException */ void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException; + + /** + * Replay the commited transactions' logs belonging to {@code partitions}. if {@code flush} is true, + * all datasets are flushed after the logs are replayed. + * + * @param partitions + * @param flush + * @throws HyracksDataException + */ + void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException; + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java index 68afb2a..6b38aa1 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.CRC32; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; -import org.apache.asterix.common.replication.IReplicationThread; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference; import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter; @@ -94,18 +93,21 @@ public class LogRecord implements ILogRecord { private final CRC32 checksumGen; private int[] PKFields; private PrimaryIndexOperationTracker opTracker; - private IReplicationThread replicationThread; + /** * The fields (numOfFlushedIndexes and nodeId) are used for remote flush logs only * to indicate the source of the log and how many indexes were flushed using its LSN. */ private int numOfFlushedIndexes; private String nodeId; - private boolean replicated = false; + private final AtomicBoolean replicated; + private boolean replicate = false; + private ILogRequester requester; public LogRecord(ILogMarkerCallback callback) { this.callback = callback; isFlushed = new AtomicBoolean(false); + replicated = new AtomicBoolean(false); readPKValue = new PrimaryKeyTupleReference(); readNewValue = SimpleTupleWriter.INSTANCE.createTupleReference(); readOldValue = SimpleTupleWriter.INSTANCE.createTupleReference(); @@ -638,15 +640,6 @@ public class LogRecord implements ILogRecord { this.nodeId = nodeId; } - public IReplicationThread getReplicationThread() { - return replicationThread; - } - - @Override - public void setReplicationThread(IReplicationThread replicationThread) { - this.replicationThread = replicationThread; - } - @Override public void setLogSource(byte logSource) { this.logSource = logSource; @@ -684,13 +677,13 @@ public class LogRecord implements ILogRecord { } @Override - public void setReplicated(boolean replicate) { - this.replicated = replicate; + public void setReplicated(boolean replicated) { + this.replicated.set(replicated); } @Override public boolean isReplicated() { - return replicated; + return replicated.get(); } @Override @@ -732,4 +725,22 @@ public class LogRecord implements ILogRecord { public ByteBuffer getMarker() { return marker; } + + @Override + public void setReplicate(boolean replicate) { + this.replicate = replicate; + } + + @Override + public boolean isReplicate() { + return replicate; + } + + public ILogRequester getRequester() { + return requester; + } + + public void setRequester(ILogRequester requester) { + this.requester = requester; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/pom.xml b/asterixdb/asterix-replication/pom.xml index 7072e48..bb983ad 100644 --- a/asterixdb/asterix-replication/pom.xml +++ b/asterixdb/asterix-replication/pom.xml @@ -51,10 +51,6 @@ <artifactId>hyracks-util</artifactId> </dependency> <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> - <dependency> <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-storage-am-lsm-common</artifactId> </dependency> @@ -72,14 +68,6 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-control-nc</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-control-common</artifactId> - </dependency> - <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java index 001d41f..807ae5f 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java @@ -19,7 +19,6 @@ package org.apache.asterix.replication.api; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.replication.IReplicationThread; import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IReplicaTask extends IReplicationMessage { @@ -31,5 +30,5 @@ public interface IReplicaTask extends IReplicationMessage { * @param worker * @throws HyracksDataException */ - void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException; + void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java index 2e1cb8a..bc6e87b 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java @@ -20,7 +20,7 @@ package org.apache.asterix.replication.api; import java.io.OutputStream; -import org.apache.asterix.replication.functions.ReplicationProtocol; +import org.apache.asterix.replication.messaging.ReplicationProtocol; import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IReplicationMessage { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java new file mode 100644 index 0000000..8840c3f --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.api; + +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.asterix.common.transactions.LogRecord; + +public interface IReplicationWorker extends Runnable { + + /** + * @return The replication socket channel. + */ + SocketChannel getChannel(); + + /** + * Gets a reusable buffer that can be used to send data + * + * @return the reusable buffer + */ + ByteBuffer getReusableBuffer(); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java new file mode 100644 index 0000000..f752745 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.api; + +import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.CATCHING_UP; +import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED; +import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.common.replication.IPartitionReplica; +import org.apache.asterix.common.storage.ReplicaIdentifier; +import org.apache.asterix.replication.messaging.ReplicationProtocol; +import org.apache.asterix.replication.sync.ReplicaSynchronizer; +import org.apache.hyracks.util.JSONUtil; +import org.apache.hyracks.util.StorageUtil; +import org.apache.hyracks.util.annotations.ThreadSafe; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +@ThreadSafe +public class PartitionReplica implements IPartitionReplica { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE); + private final INcApplicationContext appCtx; + private final ReplicaIdentifier id; + private ByteBuffer reusbaleBuf; + private PartitionReplicaStatus status = DISCONNECTED; + private SocketChannel sc; + + public PartitionReplica(ReplicaIdentifier id, INcApplicationContext appCtx) { + this.id = id; + this.appCtx = appCtx; + } + + @Override + public synchronized PartitionReplicaStatus getStatus() { + return status; + } + + @Override + public ReplicaIdentifier getIdentifier() { + return id; + } + + @Override + public synchronized void notifyFailure(Exception failure) { + setStatus(DISCONNECTED); + } + + public synchronized void sync() { + if (status == IN_SYNC || status == CATCHING_UP) { + return; + } + setStatus(CATCHING_UP); + appCtx.getThreadExecutor().execute(() -> { + try { + new ReplicaSynchronizer(appCtx, this).sync(); + setStatus(IN_SYNC); + } catch (Exception e) { + LOGGER.error(() -> "Failed to sync replica " + this, e); + notifyFailure(e); + } finally { + close(); + } + }); + } + + public synchronized SocketChannel getChannel() { + try { + if (sc == null || !sc.isOpen() || !sc.isConnected()) { + sc = SocketChannel.open(); + sc.configureBlocking(true); + sc.connect(id.getLocation()); + } + return sc; + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + public synchronized void close() { + try { + if (sc != null && sc.isOpen()) { + ReplicationProtocol.sendGoodbye(sc); + sc.close(); + sc = null; + } + } catch (IOException e) { + LOGGER.warn("Failed to close channel", e); + } + } + + public synchronized ByteBuffer getReusableBuffer() { + if (reusbaleBuf == null) { + reusbaleBuf = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); + } + return reusbaleBuf; + } + + private JsonNode asJson() { + ObjectNode json = OBJECT_MAPPER.createObjectNode(); + json.put("id", id.toString()); + json.put("state", status.name()); + return json; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionReplica that = (PartitionReplica) o; + return id.equals(that.id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + try { + return JSONUtil.convertNode(asJson()); + } catch (JsonProcessingException e) { + throw new ReplicationException(e); + } + } + + private synchronized void setStatus(PartitionReplicaStatus status) { + if (this.status == status) { + return; + } + LOGGER.info(() -> "Replica " + this + " status changing: " + this.status + " -> " + status); + this.status = status; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java new file mode 100644 index 0000000..a092322 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.api; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.common.replication.IPartitionReplica; +import org.apache.asterix.common.replication.IReplicationDestination; +import org.apache.asterix.replication.messaging.ReplicationProtocol; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ReplicationDestination implements IReplicationDestination { + + private static final Logger LOGGER = LogManager.getLogger(); + private final Set<IPartitionReplica> replicas = new HashSet<>(); + private final InetSocketAddress location; + private SocketChannel logRepChannel; + + private ReplicationDestination(InetSocketAddress location) { + this.location = location; + } + + public static ReplicationDestination at(InetSocketAddress location) { + return new ReplicationDestination(location); + } + + @Override + public synchronized void add(IPartitionReplica replica) { + replicas.add(replica); + } + + @Override + public synchronized void remove(IPartitionReplica replica) { + replicas.remove(replica); + } + + @Override + public synchronized void notifyFailure(Exception failure) { + replicas.forEach(replica -> replica.notifyFailure(failure)); + closeLogReplicationChannel(); + } + + @Override + public Set<IPartitionReplica> getReplicas() { + return new HashSet<>(replicas); + } + + public synchronized Optional<IPartitionReplica> getPartitionReplica(int partition) { + return replicas.stream().filter(replica -> replica.getIdentifier().getPartition() == partition + && replica.getStatus() == IPartitionReplica.PartitionReplicaStatus.IN_SYNC).findAny(); + } + + public synchronized SocketChannel getLogReplicationChannel() { + try { + if (logRepChannel == null || !logRepChannel.isOpen() || !logRepChannel.isConnected()) { + logRepChannel = SocketChannel.open(); + logRepChannel.configureBlocking(true); + logRepChannel.connect(location); + } + return logRepChannel; + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + private synchronized void closeLogReplicationChannel() { + try { + if (logRepChannel != null && logRepChannel.isOpen()) { + ReplicationProtocol.sendGoodbye(logRepChannel); + logRepChannel.close(); + logRepChannel = null; + } + } catch (IOException e) { + LOGGER.warn("Exception while closing socket", e); + } + } + + @Override + public InetSocketAddress getLocation() { + return location; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReplicationDestination that = (ReplicationDestination) o; + return Objects.equals(location, that.location); + } + + @Override + public String toString() { + return location.toString(); + } + + @Override + public int hashCode() { + return Objects.hash(location); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java deleted file mode 100644 index f5b2378..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.functions; - -import java.io.DataInput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.HashSet; -import java.util.Set; - -public class ReplicaFilesRequest { - private final Set<Integer> partitionIds; - private final Set<String> existingFiles; - - public ReplicaFilesRequest(Set<Integer> partitionIds, Set<String> existingFiles) { - this.partitionIds = partitionIds; - this.existingFiles = existingFiles; - } - - public void serialize(OutputStream out) throws IOException { - DataOutputStream dos = new DataOutputStream(out); - dos.writeInt(partitionIds.size()); - for (Integer partitionId : partitionIds) { - dos.writeInt(partitionId); - } - dos.writeInt(existingFiles.size()); - for (String fileName : existingFiles) { - dos.writeUTF(fileName); - } - } - - public static ReplicaFilesRequest create(DataInput input) throws IOException { - int size = input.readInt(); - Set<Integer> partitionIds = new HashSet<>(size); - for (int i = 0; i < size; i++) { - partitionIds.add(input.readInt()); - } - int filesCount = input.readInt(); - Set<String> existingFiles = new HashSet<>(filesCount); - for (int i = 0; i < filesCount; i++) { - existingFiles.add(input.readUTF()); - } - return new ReplicaFilesRequest(partitionIds, existingFiles); - } - - public Set<Integer> getPartitionIds() { - return partitionIds; - } - - public Set<String> getExistingFiles() { - return existingFiles; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaIndexFlushRequest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaIndexFlushRequest.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaIndexFlushRequest.java deleted file mode 100644 index a3c269d..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaIndexFlushRequest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.functions; - -import java.io.DataInput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.HashSet; -import java.util.Set; - -public class ReplicaIndexFlushRequest { - Set<Long> laggingRescouresIds; - - public ReplicaIndexFlushRequest(Set<Long> laggingRescouresIds) { - this.laggingRescouresIds = laggingRescouresIds; - } - - public void serialize(OutputStream out) throws IOException { - DataOutputStream dos = new DataOutputStream(out); - dos.writeInt(laggingRescouresIds.size()); - for (Long resourceId : laggingRescouresIds) { - dos.writeLong(resourceId); - } - } - - public static ReplicaIndexFlushRequest create(DataInput input) throws IOException { - int numOfResources = input.readInt(); - Set<Long> laggingRescouresIds = new HashSet<Long>(numOfResources); - for (int i = 0; i < numOfResources; i++) { - laggingRescouresIds.add(input.readLong()); - } - return new ReplicaIndexFlushRequest(laggingRescouresIds); - } - - public Set<Long> getLaggingRescouresIds() { - return laggingRescouresIds; - } - - public void setLaggingRescouresIds(Set<Long> laggingRescouresIds) { - this.laggingRescouresIds = laggingRescouresIds; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaLogsRequest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaLogsRequest.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaLogsRequest.java deleted file mode 100644 index 90df1e7..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaLogsRequest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.functions; - -import java.io.DataInput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.HashSet; -import java.util.Set; - -public class ReplicaLogsRequest { - Set<String> replicaIds; - long fromLSN; - - public ReplicaLogsRequest(Set<String> replicaIds, long fromLSN) { - this.replicaIds = replicaIds; - this.fromLSN = fromLSN; - } - - public void serialize(OutputStream out) throws IOException { - DataOutputStream dos = new DataOutputStream(out); - dos.writeInt(replicaIds.size()); - for (String replicaId : replicaIds) { - dos.writeUTF(replicaId); - } - dos.writeLong(fromLSN); - } - - public static ReplicaLogsRequest create(DataInput input) throws IOException { - int size = input.readInt(); - Set<String> replicaIds = new HashSet<String>(size); - for (int i = 0; i < size; i++) { - replicaIds.add(input.readUTF()); - } - long fromLSN = input.readLong(); - return new ReplicaLogsRequest(replicaIds, fromLSN); - } - - public Set<String> getReplicaIds() { - return replicaIds; - } - - public void setReplicaIds(Set<String> replicaIds) { - this.replicaIds = replicaIds; - } - - public long getFromLSN() { - return fromLSN; - } - - public void setFromLSN(long fromLSN) { - this.fromLSN = fromLSN; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java deleted file mode 100644 index 8094548..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java +++ /dev/null @@ -1,354 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.functions; - -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -import org.apache.asterix.common.exceptions.ReplicationException; -import org.apache.asterix.common.replication.ReplicaEvent; -import org.apache.asterix.replication.api.IReplicationMessage; -import org.apache.asterix.replication.management.NetworkingUtil; -import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; -import org.apache.asterix.replication.messaging.DeleteFileTask; -import org.apache.asterix.replication.messaging.PartitionResourcesListResponse; -import org.apache.asterix.replication.messaging.PartitionResourcesListTask; -import org.apache.asterix.replication.messaging.ReplicateFileTask; -import org.apache.asterix.replication.storage.LSMComponentProperties; -import org.apache.asterix.replication.storage.LSMIndexFileProperties; -import org.apache.asterix.replication.storage.PartitionReplica; -import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream; - -public class ReplicationProtocol { - - /** - * All replication messages start with ReplicationRequestType (4 bytes), then the length of the request in bytes - */ - public static final String JOB_REPLICATION_ACK = "$"; - - public static final int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES; - private static final int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES; - - /* - * ReplicationRequestType: - * REPLICATE_LOG: txn log replication - * REPLICATE_FILE: replicate a file(s) - * DELETE_FILE: delete a file(s) - * GET_REPLICA_FILES: used during remote recovery to request lost LSM Components - * GET_REPLICA_MAX_LSN: used during remote recovery initialize a log manager LSN - * GOODBYE: used to notify replicas that the replication request has been completed - * REPLICA_EVENT: used to notify replicas about a remote replica split/merge. - * LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM Component before its physical files are sent - * ACK: used to notify the requesting replica that the request has been completed successfully - * FLUSH_INDEX: request remote replica to flush an LSM component - */ - public enum ReplicationRequestType { - REPLICATE_LOG, - REPLICATE_FILE, - DELETE_FILE, - GET_REPLICA_FILES, - GET_REPLICA_MAX_LSN, - GOODBYE, - REPLICA_EVENT, - LSM_COMPONENT_PROPERTIES, - ACK, - FLUSH_INDEX, - PARTITION_RESOURCES_REQUEST, - PARTITION_RESOURCES_RESPONSE, - REPLICATE_RESOURCE_FILE, - DELETE_RESOURCE_FILE, - CHECKPOINT_PARTITION - } - - public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException { - //read request size - NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES); - int requestSize = dataBuffer.getInt(); - - if (dataBuffer.capacity() < requestSize) { - dataBuffer = ByteBuffer.allocate(requestSize); - } - - //read request - NetworkingUtil.readBytes(socketChannel, dataBuffer, requestSize); - - return dataBuffer; - } - - public static ByteBuffer writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, ByteBuffer buffer) - throws IOException { - ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); - try (DataOutputStream oos = new DataOutputStream(outputStream)) { - lsmCompProp.serialize(oos); - int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size(); - if (buffer.capacity() < requestSize) { - buffer = ByteBuffer.allocate(requestSize); - } else { - buffer.clear(); - } - buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal()); - buffer.putInt(oos.size()); - buffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); - buffer.flip(); - return buffer; - } - } - - public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer) - throws IOException { - //read replication request type - NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE); - - ReplicationRequestType requestType = ReplicationProtocol.ReplicationRequestType.values()[byteBuffer.getInt()]; - return requestType; - } - - public static LSMComponentProperties readLSMPropertiesRequest(ByteBuffer buffer) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit()); - DataInputStream dis = new DataInputStream(bais); - return LSMComponentProperties.create(dis); - } - - public static ByteBuffer getGoodbyeBuffer() { - ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE); - bb.putInt(ReplicationRequestType.GOODBYE.ordinal()); - bb.flip(); - return bb; - } - - public static ByteBuffer getAckBuffer() { - ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE); - bb.putInt(ReplicationRequestType.ACK.ordinal()); - bb.flip(); - return bb; - } - - public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, LSMIndexFileProperties afp, - ReplicationRequestType requestType) throws IOException { - ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); - try (DataOutputStream oos = new DataOutputStream(outputStream)) { - afp.serialize(oos); - int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size(); - if (requestBuffer.capacity() < requestSize) { - requestBuffer = ByteBuffer.allocate(requestSize); - } else { - requestBuffer.clear(); - } - requestBuffer.putInt(requestType.ordinal()); - requestBuffer.putInt(oos.size()); - requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); - requestBuffer.flip(); - return requestBuffer; - } - } - - public static LSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit()); - DataInputStream dis = new DataInputStream(bais); - return LSMIndexFileProperties.create(dis); - } - - public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException { - ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); - try (DataOutputStream oos = new DataOutputStream(outputStream)) { - event.serialize(oos); - ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size()); - buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal()); - buffer.putInt(oos.size()); - buffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); - buffer.flip(); - return buffer; - } - } - - public static ReplicaEvent readReplicaEventRequest(ByteBuffer buffer) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit()); - DataInputStream dis = new DataInputStream(bais); - - return ReplicaEvent.create(dis); - } - - public static ByteBuffer writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request) - throws IOException { - ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); - try (DataOutputStream oos = new DataOutputStream(outputStream)) { - request.serialize(oos); - - int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size(); - if (buffer.capacity() < requestSize) { - buffer = ByteBuffer.allocate(requestSize); - } else { - buffer.clear(); - } - buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal()); - buffer.putInt(oos.size()); - buffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); - buffer.flip(); - return buffer; - } - } - - public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request) - throws IOException { - ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); - try (DataOutputStream oos = new DataOutputStream(outputStream)) { - request.serialize(oos); - int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size(); - if (buffer.capacity() < requestSize) { - buffer = ByteBuffer.allocate(requestSize); - } else { - buffer.clear(); - } - buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal()); - buffer.putInt(oos.size()); - buffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); - buffer.flip(); - return buffer; - } - } - - public static ReplicaFilesRequest readReplicaFileRequest(ByteBuffer buffer) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit()); - DataInputStream dis = new DataInputStream(bais); - return ReplicaFilesRequest.create(dis); - } - - public static ReplicaIndexFlushRequest readReplicaIndexFlushRequest(ByteBuffer buffer) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit()); - DataInputStream dis = new DataInputStream(bais); - return ReplicaIndexFlushRequest.create(dis); - } - - public static void writeGetReplicaMaxLSNRequest(ByteBuffer requestBuffer) { - requestBuffer.clear(); - requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MAX_LSN.ordinal()); - requestBuffer.flip(); - } - - public static int getJobIdFromLogAckMessage(String msg) { - return Integer.parseInt(msg.substring((msg.indexOf(JOB_REPLICATION_ACK) + 1))); - } - - public static String getNodeIdFromLogAckMessage(String msg) { - return msg.substring(0, msg.indexOf(JOB_REPLICATION_ACK)); - } - - /** - * Sends a goodbye request to a remote replica indicating the end of a replication request. - * - * @param socketChannel - * the remote replica socket. - * @throws IOException - */ - public static void sendGoodbye(SocketChannel socketChannel) throws IOException { - ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer(); - NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer); - } - - public static void sendAck(SocketChannel socketChannel) throws IOException { - ByteBuffer ackBuffer = ReplicationProtocol.getAckBuffer(); - NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer); - } - - public static void sendAck(SocketChannel socketChannel, ByteBuffer buf) { - try { - buf.clear(); - buf.putInt(ReplicationRequestType.ACK.ordinal()); - buf.flip(); - NetworkingUtil.transferBufferToChannel(socketChannel, buf); - } catch (IOException e) { - throw new ReplicationException(e); - } - } - - public static void waitForAck(PartitionReplica replica) throws IOException { - final SocketChannel channel = replica.getChannel(); - final ByteBuffer buf = replica.gerReusableBuffer(); - ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(channel, buf); - if (responseFunction != ReplicationRequestType.ACK) { - throw new IllegalStateException("Unexpected response while waiting for ack."); - } - } - - public static void sendTo(PartitionReplica replica, IReplicationMessage task) { - final SocketChannel channel = replica.getChannel(); - final ByteBuffer buf = replica.gerReusableBuffer(); - sendTo(channel, task, buf); - } - - public static void sendTo(SocketChannel channel, IReplicationMessage task, ByteBuffer buf) { - ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream(); - try (DataOutputStream oos = new DataOutputStream(outputStream)) { - task.serialize(oos); - final int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size(); - final ByteBuffer requestBuffer = ensureSize(buf, requestSize); - requestBuffer.putInt(task.getMessageType().ordinal()); - requestBuffer.putInt(oos.size()); - requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); - requestBuffer.flip(); - NetworkingUtil.transferBufferToChannel(channel, requestBuffer); - } catch (IOException e) { - throw new ReplicationException(e); - } - } - - public static IReplicationMessage read(SocketChannel socketChannel, ByteBuffer buffer) throws IOException { - final ReplicationRequestType type = getRequestType(socketChannel, buffer); - return readMessage(type, socketChannel, buffer); - } - - public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel socketChannel, - ByteBuffer buffer) { - try { - ReplicationProtocol.readRequest(socketChannel, buffer); - final ByteArrayInputStream input = - new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit()); - try (DataInputStream dis = new DataInputStream(input)) { - switch (type) { - case PARTITION_RESOURCES_REQUEST: - return PartitionResourcesListTask.create(dis); - case PARTITION_RESOURCES_RESPONSE: - return PartitionResourcesListResponse.create(dis); - case REPLICATE_RESOURCE_FILE: - return ReplicateFileTask.create(dis); - case DELETE_RESOURCE_FILE: - return DeleteFileTask.create(dis); - case CHECKPOINT_PARTITION: - return CheckpointPartitionIndexesTask.create(dis); - default: - throw new IllegalStateException("Unrecognized replication message"); - } - } - } catch (IOException e) { - throw new ReplicationException(e); - } - } - - private static ByteBuffer ensureSize(ByteBuffer buffer, int size) { - if (buffer.capacity() < size) { - return ByteBuffer.allocate(size); - } - buffer.clear(); - return buffer; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java deleted file mode 100644 index 0444952..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.logging; - -import java.util.concurrent.atomic.AtomicInteger; - -public class RemoteLogMapping { - - private String remoteNodeID; - private long remoteLSN; - private long localLSN; - public AtomicInteger numOfFlushedIndexes = new AtomicInteger(); - - public void setRemoteNodeID(String remoteNodeID) { - this.remoteNodeID = remoteNodeID; - } - - public long getRemoteLSN() { - return remoteLSN; - } - - public void setRemoteLSN(long remoteLSN) { - this.remoteLSN = remoteLSN; - } - - public long getLocalLSN() { - return localLSN; - } - - public void setLocalLSN(long localLSN) { - this.localLSN = localLSN; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("Remote Node: " + remoteNodeID); - sb.append(" Remote LSN: " + remoteLSN); - sb.append(" Local LSN: " + localLSN); - return sb.toString(); - } - - public String getNodeUniqueLSN() { - return TxnLogUtil.getNodeUniqueLSN(remoteNodeID, remoteLSN); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogRecord.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogRecord.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogRecord.java new file mode 100644 index 0000000..0b496b8 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogRecord.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.logging; + +import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.asterix.common.transactions.LogRecord; + +public class RemoteLogRecord extends LogRecord { + + private long masterLsn; + private IReplicationWorker replicationWorker; + + public long getMasterLsn() { + return masterLsn; + } + + public void setMasterLsn(long masterLsn) { + this.masterLsn = masterLsn; + } + + public IReplicationWorker getReplicationWorker() { + return replicationWorker; + } + + public void setReplicationWorker(IReplicationWorker replicationWorker) { + this.replicationWorker = replicationWorker; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java new file mode 100644 index 0000000..d63496f --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.logging; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.dataflow.DatasetLocalResource; +import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.common.storage.IIndexCheckpointManager; +import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; +import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.replication.messaging.ReplicationProtocol; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.LocalResource; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +class RemoteLogsNotifier implements Runnable { + + private static final Logger LOGGER = LogManager.getLogger(); + private final PersistentLocalResourceRepository localResourceRep; + private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; + private final LinkedBlockingQueue<RemoteLogRecord> remoteLogsQ; + private final INcApplicationContext appCtx; + + public RemoteLogsNotifier(INcApplicationContext appCtx, LinkedBlockingQueue<RemoteLogRecord> remoteLogsQ) { + this.appCtx = appCtx; + this.remoteLogsQ = remoteLogsQ; + localResourceRep = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); + indexCheckpointManagerProvider = appCtx.getIndexCheckpointManagerProvider(); + } + + @Override + public void run() { + final String nodeId = appCtx.getServiceContext().getNodeId(); + Thread.currentThread().setName(nodeId + RemoteLogsNotifier.class.getSimpleName()); + while (!Thread.currentThread().isInterrupted()) { + try { + final RemoteLogRecord logRecord = remoteLogsQ.take(); + switch (logRecord.getLogType()) { + case LogType.JOB_COMMIT: + case LogType.ABORT: + // send ACK to requester + logRecord.getReplicationWorker().getChannel().socket().getOutputStream() + .write((nodeId + ReplicationProtocol.LOG_REPLICATION_ACK + logRecord.getTxnId() + System + .lineSeparator()).getBytes()); + break; + case LogType.FLUSH: + checkpointReplicaIndexes(logRecord, logRecord.getDatasetId()); + break; + default: + throw new IllegalStateException("Unexpected log type: " + logRecord.getLogType()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (IOException e) { + LOGGER.error("Failed to process replicated log", e); + } + } + } + + private void checkpointReplicaIndexes(RemoteLogRecord remoteLogMapping, int datasetId) throws HyracksDataException { + final Set<Integer> masterPartitions = appCtx.getReplicaManager().getPartitions(); + final Predicate<LocalResource> replicaIndexesPredicate = lr -> { + DatasetLocalResource dls = (DatasetLocalResource) lr.getResource(); + return dls.getDatasetId() == datasetId && !masterPartitions.contains(dls.getPartition()); + }; + final Map<Long, LocalResource> resources = localResourceRep.getResources(replicaIndexesPredicate); + final List<DatasetResourceReference> replicaIndexesRef = + resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); + for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) { + final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(replicaIndexRef); + synchronized (indexCheckpointManager) { + indexCheckpointManager.masterFlush(remoteLogMapping.getMasterLsn(), remoteLogMapping.getLSN()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java new file mode 100644 index 0000000..c054e6c --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.logging; + +import java.nio.ByteBuffer; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.transactions.ILogManager; +import org.apache.asterix.common.transactions.ILogRecord; +import org.apache.asterix.common.transactions.ILogRequester; +import org.apache.asterix.common.transactions.LogSource; +import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.common.utils.TransactionUtil; +import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class RemoteLogsProcessor implements ILogRequester { + + private static final Logger LOGGER = LogManager.getLogger(); + private final LinkedBlockingQueue<RemoteLogRecord> remoteLogsQ; + private final ILogManager logManager; + + public RemoteLogsProcessor(INcApplicationContext appCtx) { + logManager = appCtx.getTransactionSubsystem().getLogManager(); + remoteLogsQ = new LinkedBlockingQueue<>(); + appCtx.getThreadExecutor().execute(new RemoteLogsNotifier(appCtx, remoteLogsQ)); + } + + public void process(ByteBuffer logsBatch, RemoteLogRecord reusableLog, IReplicationWorker worker) { + while (logsBatch.hasRemaining()) { + // get rid of log size + logsBatch.getInt(); + reusableLog.readRemoteLog(logsBatch); + reusableLog.setLogSource(LogSource.REMOTE); + switch (reusableLog.getLogType()) { + case LogType.UPDATE: + case LogType.ENTITY_COMMIT: + logManager.log(reusableLog); + break; + case LogType.JOB_COMMIT: + case LogType.ABORT: + RemoteLogRecord jobTerminationLog = new RemoteLogRecord(); + TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, reusableLog.getTxnId(), + reusableLog.getLogType() == LogType.JOB_COMMIT); + jobTerminationLog.setRequester(this); + jobTerminationLog.setReplicationWorker(worker); + jobTerminationLog.setLogSource(LogSource.REMOTE); + logManager.log(jobTerminationLog); + break; + case LogType.FLUSH: + RemoteLogRecord flushLog = new RemoteLogRecord(); + TransactionUtil + .formFlushLogRecord(flushLog, reusableLog.getDatasetId(), null, reusableLog.getNodeId(), + reusableLog.getNumOfFlushedIndexes()); + flushLog.setRequester(this); + flushLog.setLogSource(LogSource.REMOTE); + flushLog.setMasterLsn(reusableLog.getLSN()); + logManager.log(flushLog); + break; + default: + LOGGER.error(() -> "Unsupported LogType: " + reusableLog.getLogType()); + } + } + } + + @Override + public void notifyFlushed(ILogRecord logRecord) { + remoteLogsQ.add((RemoteLogRecord) logRecord); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java index 283f69f..a94f073 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java @@ -22,9 +22,10 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.asterix.common.transactions.ILogRecord; -import org.apache.asterix.replication.management.ReplicationManager; +import org.apache.asterix.replication.management.LogReplicationManager; public class ReplicationLogBuffer { + private final int logBufferSize; private final AtomicBoolean full; private int appendOffset; @@ -32,10 +33,10 @@ public class ReplicationLogBuffer { private final ByteBuffer appendBuffer; private final ByteBuffer replicationBuffer; private boolean stop; - private ReplicationManager replicationManager; + private final LogReplicationManager replicationManager; private final int batchSize; - public ReplicationLogBuffer(ReplicationManager replicationManager, int logBufferSize, int batchSize) { + public ReplicationLogBuffer(LogReplicationManager replicationManager, int logBufferSize, int batchSize) { this.replicationManager = replicationManager; this.logBufferSize = logBufferSize; this.batchSize = batchSize; @@ -116,7 +117,7 @@ public class ReplicationLogBuffer { private void transferBuffer(ByteBuffer buffer) { if (buffer.remaining() <= batchSize) { //the current batch can be sent as it is - replicationManager.replicateTxnLogBatch(buffer); + replicationManager.transferBatch(buffer); return; } /** @@ -141,7 +142,7 @@ public class ReplicationLogBuffer { //return to the beginning of the batch position buffer.reset(); } - replicationManager.replicateTxnLogBatch(buffer); + replicationManager.transferBatch(buffer); //return the original limit to check the new remaining size buffer.limit(totalTransferLimit); } @@ -159,7 +160,7 @@ public class ReplicationLogBuffer { return logBufferSize; } - public ReplicationManager getReplicationManager() { + public LogReplicationManager getReplicationManager() { return replicationManager; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAck.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAck.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAck.java new file mode 100644 index 0000000..1b0ac23 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAck.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.logging; + +import java.util.Set; + +import org.apache.asterix.common.replication.IReplicationDestination; +import org.apache.asterix.common.transactions.ILogRecord; + +public class TxnAck { + + private final Set<IReplicationDestination> pendingAcks; + private final ILogRecord logRecord; + + public TxnAck(ILogRecord logRecord, Set<IReplicationDestination> pendingAcks) { + this.logRecord = logRecord; + this.pendingAcks = pendingAcks; + } + + public synchronized void ack(IReplicationDestination dest) { + pendingAcks.remove(dest); + if (allAcked()) { + synchronized (logRecord) { + logRecord.setReplicated(true); + logRecord.notifyAll(); + } + } + } + + public synchronized boolean allAcked() { + return pendingAcks.isEmpty(); + } +}