http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java index 6445345..727a379 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java @@ -18,1342 +18,120 @@ */ package org.apache.asterix.replication.management; -import java.io.BufferedReader; -import java.io.File; import java.io.IOException; -import java.io.InputStreamReader; import java.io.OutputStream; -import java.io.RandomAccessFile; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.FileChannel; -import java.nio.channels.SocketChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; -import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.replication.IPartitionReplica; -import org.apache.asterix.common.replication.IReplicaResourcesManager; +import org.apache.asterix.common.replication.IReplicationDestination; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.IReplicationStrategy; -import org.apache.asterix.common.replication.Replica; -import org.apache.asterix.common.replication.Replica.ReplicaState; -import org.apache.asterix.common.replication.ReplicaEvent; -import org.apache.asterix.common.replication.ReplicationJob; import org.apache.asterix.common.replication.ReplicationStrategyFactory; -import org.apache.asterix.common.storage.DatasetResourceReference; -import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; -import org.apache.asterix.common.storage.ResourceReference; -import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; -import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.ILogRecord; -import org.apache.asterix.common.transactions.LogType; -import org.apache.asterix.replication.functions.ReplicaFilesRequest; -import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest; -import org.apache.asterix.replication.functions.ReplicationProtocol; -import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType; -import org.apache.asterix.replication.logging.ReplicationLogBuffer; -import org.apache.asterix.replication.logging.TxnLogReplicator; -import org.apache.asterix.replication.storage.LSMComponentProperties; -import org.apache.asterix.replication.storage.LSMIndexFileProperties; -import org.apache.asterix.replication.storage.ReplicaResourcesManager; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; -import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; -import org.apache.hyracks.api.application.INCServiceContext; -import org.apache.hyracks.api.config.IApplicationConfig; -import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.asterix.replication.api.ReplicationDestination; import org.apache.hyracks.api.replication.IReplicationJob; -import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType; -import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType; -import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation; -import org.apache.hyracks.control.common.controllers.NCConfig; -import org.apache.hyracks.control.nc.NodeControllerService; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; -import org.apache.hyracks.util.StorageUtil; -import org.apache.hyracks.util.StorageUtil.StorageUnit; -import org.apache.logging.log4j.Level; +import org.apache.hyracks.util.annotations.ThreadSafe; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -/** - * This class is used to process replication jobs and maintain remote replicas states - */ +@ThreadSafe public class ReplicationManager implements IReplicationManager { private static final Logger LOGGER = LogManager.getLogger(); - private static final int INITIAL_REPLICATION_FACTOR = 1; - private static final int MAX_JOB_COMMIT_ACK_WAIT = 10000; - private final String nodeId; - private ExecutorService replicationListenerThreads; - private final Map<Long, Set<String>> txnCommitAcks; - private final Map<Long, ILogRecord> replicationTxnsPendingAcks; - private ByteBuffer dataBuffer; - private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ; - private final LinkedBlockingQueue<ReplicaEvent> replicaEventsQ; - - private int replicationFactor = 1; - private final ReplicaResourcesManager replicaResourcesManager; - private final ILogManager logManager; - private final IAppRuntimeContextProvider asterixAppRuntimeContextProvider; + private final Map<InetSocketAddress, ReplicationDestination> dests = new HashMap<>(); private final ReplicationProperties replicationProperties; - private final Map<String, Replica> replicas; - private final Map<String, Set<Integer>> replica2PartitionsMap; - - private final AtomicBoolean replicationSuspended; - private AtomicBoolean terminateJobsReplication; - private AtomicBoolean jobsReplicationSuspended; - private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUnit.KILOBYTE); - private final Set<String> shuttingDownReplicaIds; - //replication threads - private ReplicationJobsProccessor replicationJobsProcessor; - private final ReplicasEventsMonitor replicationMonitor; - //dummy job used to stop ReplicationJobsProccessor thread. - private static final IReplicationJob REPLICATION_JOB_POISON_PILL = new ReplicationJob(ReplicationJobType.METADATA, - ReplicationOperation.REPLICATE, ReplicationExecutionType.ASYNC, null); - //used to identify the correct IP address when the node has multiple network interfaces - private String hostIPAddressFirstOctet = null; + private final IReplicationStrategy strategy; + private final INcApplicationContext appCtx; + private final LogReplicationManager logReplicationManager; + private final IndexReplicationManager lsnIndexReplicationManager; - private LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ; - private LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ; - protected ReplicationLogBuffer currentTxnLogBuffer; - private TxnLogReplicator txnlogReplicator; - private Future<? extends Object> txnLogReplicatorTask; - private SocketChannel[] logsRepSockets; - private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES); - private IReplicationStrategy replicationStrategy; - private final PersistentLocalResourceRepository localResourceRepo; - private NCConfig ncConfig; - private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; - - //TODO this class needs to be refactored by moving its private classes to separate files - //and possibly using MessageBroker to send/receive remote replicas events. - public ReplicationManager(String nodeId, ReplicationProperties replicationProperties, - IReplicaResourcesManager remoteResoucesManager, ILogManager logManager, - IAppRuntimeContextProvider asterixAppRuntimeContextProvider, INCServiceContext ncServiceContext) { - this.nodeId = nodeId; - this.ncConfig = ((NodeControllerService) ncServiceContext.getControllerService()).getConfiguration(); + public ReplicationManager(INcApplicationContext appCtx, ReplicationProperties replicationProperties) { this.replicationProperties = replicationProperties; - try { - replicationStrategy = ReplicationStrategyFactory.create(replicationProperties.getReplicationStrategy(), - replicationProperties, ncConfig.getConfigManager()); - } catch (HyracksDataException e) { - LOGGER.log(Level.WARN, "Couldn't initialize replication strategy", e); - } - this.replicaResourcesManager = (ReplicaResourcesManager) remoteResoucesManager; - this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider; - this.logManager = logManager; - localResourceRepo = - (PersistentLocalResourceRepository) asterixAppRuntimeContextProvider.getLocalResourceRepository(); - this.hostIPAddressFirstOctet = ncConfig.getPublicAddress().substring(0, 3); - this.indexCheckpointManagerProvider = - asterixAppRuntimeContextProvider.getAppContext().getIndexCheckpointManagerProvider(); - replicas = new HashMap<>(); - replicationJobsQ = new LinkedBlockingQueue<>(); - replicaEventsQ = new LinkedBlockingQueue<>(); - terminateJobsReplication = new AtomicBoolean(false); - jobsReplicationSuspended = new AtomicBoolean(true); - replicationSuspended = new AtomicBoolean(true); - txnCommitAcks = new ConcurrentHashMap<>(); - replicationTxnsPendingAcks = new ConcurrentHashMap<>(); - shuttingDownReplicaIds = new HashSet<>(); - dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); - replicationMonitor = new ReplicasEventsMonitor(); - //add list of replicas from configurations (To be read from another source e.g. Zookeeper) - Set<Replica> replicaNodes = replicationStrategy.getRemoteReplicas(nodeId); - - //Used as async listeners from replicas - replicationListenerThreads = Executors.newCachedThreadPool(); - replicationJobsProcessor = new ReplicationJobsProccessor(); - - Map<String, ClusterPartition[]> nodePartitions = - asterixAppRuntimeContextProvider.getAppContext().getMetadataProperties().getNodePartitions(); - replica2PartitionsMap = new HashMap<>(replicaNodes.size()); - for (Replica replica : replicaNodes) { - replicas.put(replica.getId(), replica); - //for each remote replica, get the list of replication clients - Set<Replica> nodeReplicationClients = replicationStrategy.getRemotePrimaryReplicas(replica.getId()); - //get the partitions of each client - List<Integer> clientPartitions = new ArrayList<>(); - for (Replica client : nodeReplicationClients) { - for (ClusterPartition clusterPartition : nodePartitions.get(client.getId())) { - clientPartitions.add(clusterPartition.getPartitionId()); - } - } - Set<Integer> clientPartitonsSet = new HashSet<>(clientPartitions.size()); - clientPartitonsSet.addAll(clientPartitions); - replica2PartitionsMap.put(replica.getId(), clientPartitonsSet); - } - int numLogBuffers = replicationProperties.getLogBufferNumOfPages(); - emptyLogBuffersQ = new LinkedBlockingQueue<>(numLogBuffers); - pendingFlushLogBuffersQ = new LinkedBlockingQueue<>(numLogBuffers); - - int logBufferSize = replicationProperties.getLogBufferPageSize(); - for (int i = 0; i < numLogBuffers; i++) { - emptyLogBuffersQ - .offer(new ReplicationLogBuffer(this, logBufferSize, replicationProperties.getLogBatchSize())); - } + this.appCtx = appCtx; + strategy = ReplicationStrategyFactory.create(replicationProperties.getReplicationStrategy()); + logReplicationManager = new LogReplicationManager(appCtx, this); + lsnIndexReplicationManager = new IndexReplicationManager(appCtx, this); } @Override - public void submitJob(IReplicationJob job) throws IOException { - if (job.getExecutionType() == ReplicationExecutionType.ASYNC) { - replicationJobsQ.offer(job); - } else { - //wait until replication is resumed - while (replicationSuspended.get()) { - synchronized (replicationSuspended) { - try { - replicationSuspended.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - processJob(job, null, null); + public void register(IPartitionReplica replica) { + synchronized (dests) { + final InetSocketAddress location = replica.getIdentifier().getLocation(); + final ReplicationDestination replicationDest = dests.computeIfAbsent(location, ReplicationDestination::at); + replicationDest.add(replica); + logReplicationManager.register(replicationDest); + lsnIndexReplicationManager.register(replicationDest); } } @Override - public void replicateLog(ILogRecord logRecord) throws InterruptedException { - if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) { - //if replication is suspended, wait until it is resumed. - while (replicationSuspended.get()) { - synchronized (replicationSuspended) { - replicationSuspended.wait(); - } - } - Set<String> replicaIds = Collections.synchronizedSet(new HashSet<String>()); - replicaIds.add(nodeId); - txnCommitAcks.put(logRecord.getTxnId(), replicaIds); - } - - appendToLogBuffer(logRecord); - } - - protected void getAndInitNewLargePage(int pageSize) { - // for now, alloc a new buffer for each large page - // TODO: consider pooling large pages - currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize, replicationProperties.getLogBufferPageSize()); - pendingFlushLogBuffersQ.offer(currentTxnLogBuffer); - } - - protected void getAndInitNewPage() throws InterruptedException { - currentTxnLogBuffer = null; - while (currentTxnLogBuffer == null) { - currentTxnLogBuffer = emptyLogBuffersQ.take(); - } - currentTxnLogBuffer.reset(); - pendingFlushLogBuffersQ.offer(currentTxnLogBuffer); - } - - private synchronized void appendToLogBuffer(ILogRecord logRecord) throws InterruptedException { - if (!currentTxnLogBuffer.hasSpace(logRecord)) { - currentTxnLogBuffer.isFull(true); - if (logRecord.getLogSize() > getLogPageSize()) { - getAndInitNewLargePage(logRecord.getLogSize()); - } else { - getAndInitNewPage(); - } - } - currentTxnLogBuffer.append(logRecord); - } - - /** - * Processes the replication job based on its specifications - * - * @param job - * The replication job - * @param replicasSockets - * The remote replicas sockets to send the request to. - * @param requestBuffer - * The buffer to use to send the request. - * @throws IOException - */ - private void processJob(IReplicationJob job, Map<String, SocketChannel> replicasSockets, ByteBuffer requestBuffer) - throws IOException { - try { - - //all of the job's files belong to a single storage partition. - //get any of them to determine the partition from the file path. - String jobFile = job.getJobFiles().iterator().next(); - DatasetResourceReference indexFileRef = localResourceRepo.getLocalResourceReference(jobFile); - if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) { + public void unregister(IPartitionReplica replica) { + synchronized (dests) { + final InetSocketAddress location = replica.getIdentifier().getLocation(); + final ReplicationDestination dest = dests.get(location); + if (dest == null) { + LOGGER.warn(() -> "Asked to unregister unknown replica " + replica); return; } - int jobPartitionId = indexFileRef.getPartitionId(); - - ByteBuffer responseBuffer = null; - LSMIndexFileProperties asterixFileProperties = new LSMIndexFileProperties(); - if (requestBuffer == null) { - requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); - } - - boolean isLSMComponentFile = job.getJobType() == ReplicationJobType.LSM_COMPONENT; - try { - //if there isn't already a connection, establish a new one - if (replicasSockets == null) { - replicasSockets = getActiveRemoteReplicasSockets(); - } - - int remainingFiles = job.getJobFiles().size(); - if (job.getOperation() == ReplicationOperation.REPLICATE) { - //if the replication job is an LSM_COMPONENT, its properties are sent first, then its files. - ILSMIndexReplicationJob LSMComponentJob = null; - if (job.getJobType() == ReplicationJobType.LSM_COMPONENT) { - //send LSMComponent properties - LSMComponentJob = (ILSMIndexReplicationJob) job; - LSMComponentProperties lsmCompProp = new LSMComponentProperties(LSMComponentJob, nodeId); - requestBuffer = ReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp, //NOSONAR - requestBuffer); - sendRequest(replicasSockets, requestBuffer); - } - - for (String filePath : job.getJobFiles()) { - remainingFiles--; - Path path = Paths.get(filePath); - if (Files.notExists(path)) { - LOGGER.log(Level.ERROR, "File deleted before replication: " + filePath); - continue; - } - - LOGGER.log(Level.INFO, "Replicating file: " + filePath); - //open file for reading - try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r"); - FileChannel fileChannel = fromFile.getChannel();) { - - long fileSize = fileChannel.size(); - asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile, - remainingFiles == 0); - requestBuffer = ReplicationProtocol.writeFileReplicationRequest(requestBuffer, - asterixFileProperties, ReplicationRequestType.REPLICATE_FILE); - Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<String, SocketChannel> entry = iterator.next(); - //if the remote replica is not interested in this partition, skip it. - if (!replica2PartitionsMap.get(entry.getKey()).contains(jobPartitionId)) { - continue; - } - SocketChannel socketChannel = entry.getValue(); - //transfer request header & file - try { - NetworkingUtil.transferBufferToChannel(socketChannel, requestBuffer); - NetworkingUtil.sendFile(fileChannel, socketChannel); - if (asterixFileProperties.requiresAck()) { - ReplicationRequestType responseType = - waitForResponse(socketChannel, responseBuffer); - if (responseType != ReplicationRequestType.ACK) { - throw new IOException( - "Could not receive ACK from replica " + entry.getKey()); - } - } - } catch (IOException e) { - handleReplicationFailure(socketChannel, e); - iterator.remove(); - } finally { - requestBuffer.position(0); - } - } - } - } - } else if (job.getOperation() == ReplicationOperation.DELETE) { - for (String filePath : job.getJobFiles()) { - remainingFiles--; - asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile, remainingFiles == 0); - ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties, - ReplicationRequestType.DELETE_FILE); - - Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<String, SocketChannel> entry = iterator.next(); - //if the remote replica is not interested in this partition, skip it. - if (!replica2PartitionsMap.get(entry.getKey()).contains(jobPartitionId)) { - continue; - } - SocketChannel socketChannel = entry.getValue(); - try { - sendRequest(replicasSockets, requestBuffer); - if (asterixFileProperties.requiresAck()) { - waitForResponse(socketChannel, responseBuffer); - } - } catch (IOException e) { - handleReplicationFailure(socketChannel, e); - iterator.remove(); - } finally { - requestBuffer.position(0); - } - } - } - } - } finally { - //if sync, close sockets with replicas since they wont be reused - if (job.getExecutionType() == ReplicationExecutionType.SYNC) { - closeReplicaSockets(replicasSockets); - } - } - } finally { - exitReplicatedLSMComponent(job); - } - } - - private static void exitReplicatedLSMComponent(IReplicationJob job) throws HyracksDataException { - if (job.getOperation() == ReplicationOperation.REPLICATE && job instanceof ILSMIndexReplicationJob) { - //exit the replicated LSM components - ILSMIndexReplicationJob aJob = (ILSMIndexReplicationJob) job; - aJob.endReplication(); - } - } - - /** - * Waits and reads a response from a remote replica - * - * @param socketChannel - * The socket to read the response from - * @param responseBuffer - * The response buffer to read the response to. - * @return The response type. - * @throws IOException - */ - private static ReplicationRequestType waitForResponse(SocketChannel socketChannel, ByteBuffer responseBuffer) - throws IOException { - if (responseBuffer == null) { - responseBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE); - } else { - responseBuffer.clear(); - } - - //read response from remote replicas - ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel, responseBuffer); - return responseFunction; - } - - @Override - public boolean isReplicationEnabled() { - return replicationStrategy.isParticipant(nodeId); - } - - @Override - public synchronized void updateReplicaInfo(Replica replicaNode) { - Replica replica = replicas.get(replicaNode.getId()); - //should not update the info of an active replica - if (replica.getState() == ReplicaState.ACTIVE) { - return; - } - replica.setClusterIp(replicaNode.getClusterIp()); - } - - /** - * Suspends processing replication jobs/logs. - * - * @param force - * a flag indicates if replication should be suspended right away or when the pending jobs are completed. - */ - private void suspendReplication(boolean force) { - //suspend replication jobs processing - if (replicationJobsProcessor != null && replicationJobsProcessor.isAlive()) { - if (force) { - terminateJobsReplication.set(true); - } - replicationJobsQ.offer(REPLICATION_JOB_POISON_PILL); - - //wait until the jobs are suspended - synchronized (jobsReplicationSuspended) { - while (!jobsReplicationSuspended.get()) { - try { - jobsReplicationSuspended.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - } - - //suspend logs replication - if (txnlogReplicator != null) { - endTxnLogReplicationHandshake(); - } - } - - /** - * Opens a new connection with Active remote replicas and starts a listen thread per connection. - */ - private void establishTxnLogReplicationHandshake() { - Map<String, SocketChannel> activeRemoteReplicasSockets = getActiveRemoteReplicasSockets(); - logsRepSockets = new SocketChannel[activeRemoteReplicasSockets.size()]; - int i = 0; - //start a listener thread per connection - for (Entry<String, SocketChannel> entry : activeRemoteReplicasSockets.entrySet()) { - logsRepSockets[i] = entry.getValue(); - replicationListenerThreads - .execute(new TxnLogsReplicationResponseListener(entry.getKey(), entry.getValue())); - i++; - } - - /* - * establish log replication handshake - */ - ByteBuffer handshakeBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE) - .putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal()); - handshakeBuffer.flip(); - //send handshake request - for (SocketChannel replicaSocket : logsRepSockets) { - try { - NetworkingUtil.transferBufferToChannel(replicaSocket, handshakeBuffer); - } catch (IOException e) { - handleReplicationFailure(replicaSocket, e); - } finally { - handshakeBuffer.position(0); - } - } - } - - private void handleReplicationFailure(SocketChannel socketChannel, Throwable t) { - if (LOGGER.isWarnEnabled()) { - LOGGER.log(Level.WARN, "Could not complete replication request.", t); - } - if (socketChannel.isOpen()) { - try { - socketChannel.close(); - } catch (IOException e) { - LOGGER.log(Level.WARN, "Could not close socket.", e); - } - } - reportFailedReplica(getReplicaIdBySocket(socketChannel)); - } - - /** - * Stops TxnLogReplicator and closes the sockets used to replicate logs. - */ - private void endTxnLogReplicationHandshake() { - LOGGER.info("Terminating TxnLogReplicator thread ..."); - txnlogReplicator.terminate(); - try { - txnLogReplicatorTask.get(); - } catch (ExecutionException | InterruptedException e) { - LOGGER.error("TxnLogReplicator thread terminated abnormally", e); - } - LOGGER.info("TxnLogReplicator thread was terminated."); - - /* - * End log replication handshake (by sending a dummy log with a single byte) - */ - ByteBuffer endLogRepHandshake = ByteBuffer.allocate(Integer.SIZE + 1).putInt(1).put((byte) 0); - endLogRepHandshake.flip(); - for (SocketChannel replicaSocket : logsRepSockets) { - try { - NetworkingUtil.transferBufferToChannel(replicaSocket, endLogRepHandshake); - } catch (IOException e) { - handleReplicationFailure(replicaSocket, e); - } finally { - endLogRepHandshake.position(0); - } - } - - //wait for any ACK to arrive before closing sockets. - if (logsRepSockets != null) { - synchronized (txnCommitAcks) { - try { - long waitStartTime = System.currentTimeMillis(); - while (!txnCommitAcks.isEmpty()) { - txnCommitAcks.wait(1000); - long waitDuration = System.currentTimeMillis() - waitStartTime; - if (waitDuration > MAX_JOB_COMMIT_ACK_WAIT) { - LOGGER.log(Level.ERROR, - "Timeout before receving all job ACKs from replicas. Pending txns (" - + txnCommitAcks.keySet().toString() + ")"); - break; - } - } - } catch (InterruptedException e) { - LOGGER.error("Interrupted while waiting for jobs ACK", e); - Thread.currentThread().interrupt(); - } - } - } - - /* - * Close log replication sockets - */ - ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer(); - for (SocketChannel replicaSocket : logsRepSockets) { - try { - //send goodbye to remote replica - NetworkingUtil.transferBufferToChannel(replicaSocket, goodbyeBuffer); - replicaSocket.close(); - } catch (IOException e) { - handleReplicationFailure(replicaSocket, e); - } finally { - goodbyeBuffer.position(0); - } - } - logsRepSockets = null; - } - - /** - * Sends a shutdown event to remote replicas notifying them - * no more logs/files will be sent from this local replica. - * - * @throws IOException - */ - private void sendShutdownNotifiction() throws IOException { - Replica replica = new Replica(nodeId, NetworkingUtil.getHostAddress(hostIPAddressFirstOctet), - ncConfig.getReplicationPublicPort()); - ReplicaEvent event = new ReplicaEvent(replica, ClusterEventType.NODE_SHUTTING_DOWN); - ByteBuffer buffer = ReplicationProtocol.writeReplicaEventRequest(event); - Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets(); - sendRequest(replicaSockets, buffer); - closeReplicaSockets(replicaSockets); - } - - /** - * Sends a request to remote replicas - * - * @param replicaSockets - * The sockets to send the request to. - * @param requestBuffer - * The buffer that contains the request. - */ - private void sendRequest(Map<String, SocketChannel> replicaSockets, ByteBuffer requestBuffer) { - Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<String, SocketChannel> replicaSocket = iterator.next(); - SocketChannel clientSocket = replicaSocket.getValue(); - try { - NetworkingUtil.transferBufferToChannel(clientSocket, requestBuffer); - } catch (IOException e) { - handleReplicationFailure(clientSocket, e); - iterator.remove(); - } finally { - requestBuffer.position(0); - } - } - } - - /** - * Closes the passed replication sockets by sending GOODBYE request to remote replicas. - * - * @param replicaSockets - */ - private void closeReplicaSockets(Map<String, SocketChannel> replicaSockets) { - //send goodbye - ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer(); - sendRequest(replicaSockets, goodbyeBuffer); - - Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<String, SocketChannel> replicaSocket = iterator.next(); - SocketChannel clientSocket = replicaSocket.getValue(); - if (clientSocket.isOpen()) { - try { - clientSocket.close(); - } catch (IOException e) { - handleReplicationFailure(clientSocket, e); - } + LOGGER.info(() -> "unregister " + replica); + dest.remove(replica); + if (dest.getReplicas().isEmpty()) { + LOGGER.info(() -> "Removing destination with no replicas " + dest); + logReplicationManager.unregister(dest); + lsnIndexReplicationManager.unregister(dest); + dests.remove(location); } } } @Override - public void initializeReplicasState() { - for (Replica replica : replicas.values()) { - checkReplicaState(replica.getId(), false, false); - } - } - - /** - * Checks the state of a remote replica by trying to ping it. - * - * @param replicaId - * The replica to check the state for. - * @param async - * a flag indicating whether to wait for the result or not. - * @param suspendReplication - * a flag indicating whether to suspend replication on replica state change or not. - */ - private void checkReplicaState(String replicaId, boolean async, boolean suspendReplication) { - Replica replica = replicas.get(replicaId); - - ReplicaStateChecker connector = new ReplicaStateChecker(replica, replicationProperties.getReplicationTimeOut(), - this, suspendReplication); - Future<? extends Object> ft = asterixAppRuntimeContextProvider.getThreadExecutor().submit(connector); - - if (!async) { - //wait until task is done - while (!ft.isDone()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - } - - /** - * Updates the state of a remote replica. - * - * @param replicaId - * The replica id to update. - * @param newState - * The new state of the replica. - * @param suspendReplication - * a flag indicating whether to suspend replication on state change or not. - * @throws InterruptedException - */ - public synchronized void updateReplicaState(String replicaId, ReplicaState newState, boolean suspendReplication) - throws InterruptedException { - Replica replica = replicas.get(replicaId); - - if (replica.getState() == newState) { - return; - } - - if (suspendReplication) { - //prevent new jobs/logs from coming in - replicationSuspended.set(true); - - if (newState == ReplicaState.DEAD) { - //assume the dead replica ACK has been received for all pending jobs - synchronized (txnCommitAcks) { - for (Long txnId : txnCommitAcks.keySet()) { - addAckToJob(txnId, replicaId); - } - } - } - - //force replication threads to stop in order to change the replication factor - suspendReplication(true); - } - - replica.setState(newState); - - if (newState == ReplicaState.ACTIVE) { - replicationFactor++; - } else if (newState == ReplicaState.DEAD && replicationFactor > INITIAL_REPLICATION_FACTOR) { - replicationFactor--; - } - - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("Replica " + replicaId + " state changed to: " + newState.name() - + ". Replication factor changed to: " + replicationFactor); - } - - if (suspendReplication) { - startReplicationThreads(); - } - } - - /** - * When an ACK for a JOB_COMMIT is received, it is added to the corresponding job. - * - * @param txnId - * @param replicaId - * The remote replica id the ACK received from. - */ - private void addAckToJob(long txnId, String replicaId) { - synchronized (txnCommitAcks) { - //add ACK to the job - if (txnCommitAcks.containsKey(txnId)) { - Set<String> replicaIds = txnCommitAcks.get(txnId); - replicaIds.add(replicaId); - } else { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("Invalid job replication ACK received for txnId(" + txnId + ")"); - } - return; - } - - //if got ACKs from all remote replicas, notify pending jobs if any - - if (txnCommitAcks.get(txnId).size() == replicationFactor && replicationTxnsPendingAcks.containsKey(txnId)) { - ILogRecord pendingLog = replicationTxnsPendingAcks.get(txnId); - synchronized (pendingLog) { - pendingLog.notifyAll(); - } - } - } + public void notifyFailure(IReplicationDestination dest, Exception failure) { + LOGGER.info(() -> "processing failure for " + dest); + appCtx.getThreadExecutor().execute(() -> { + logReplicationManager.unregister(dest); + lsnIndexReplicationManager.unregister(dest); + dest.notifyFailure(failure); + }); } @Override - public boolean hasBeenReplicated(ILogRecord logRecord) { - long txnId = logRecord.getTxnId(); - if (txnCommitAcks.containsKey(txnId)) { - synchronized (txnCommitAcks) { - //check if all ACKs have been received - if (txnCommitAcks.get(txnId).size() == replicationFactor) { - txnCommitAcks.remove(txnId); - - //remove from pending jobs if exists - replicationTxnsPendingAcks.remove(txnId); - - //notify any threads waiting for all jobs to finish - if (txnCommitAcks.size() == 0) { - txnCommitAcks.notifyAll(); - } - return true; - } else { - replicationTxnsPendingAcks.putIfAbsent(txnId, logRecord); - return false; - } - } - } - //presume replicated - return true; - } - - private Map<String, SocketChannel> getActiveRemoteReplicasSockets() { - Map<String, SocketChannel> replicaNodesSockets = new HashMap<>(); - for (Replica replica : replicas.values()) { - if (replica.getState() == ReplicaState.ACTIVE) { - try { - SocketChannel sc = getReplicaSocket(replica.getId()); - replicaNodesSockets.put(replica.getId(), sc); - } catch (IOException e) { - if (LOGGER.isWarnEnabled()) { - LOGGER.log(Level.WARN, "Could not get replica socket", e); - } - reportFailedReplica(replica.getId()); - } - } - } - return replicaNodesSockets; - } - - /** - * Establishes a connection with a remote replica. - * - * @param replicaId - * The replica to connect to. - * @return The socket of the remote replica - * @throws IOException - */ - private SocketChannel getReplicaSocket(String replicaId) throws IOException { - SocketChannel sc = SocketChannel.open(); - sc.configureBlocking(true); - IApplicationConfig config = ncConfig.getConfigManager().getNodeEffectiveConfig(replicaId); - sc.connect(new InetSocketAddress(config.getString(NCConfig.Option.REPLICATION_LISTEN_ADDRESS), - config.getInt(NCConfig.Option.REPLICATION_LISTEN_PORT))); - return sc; + public void replicate(ILogRecord logRecord) throws InterruptedException { + logReplicationManager.replicate(logRecord); } @Override - public Set<String> getDeadReplicasIds() { - Set<String> replicasIds = new HashSet<>(); - for (Replica replica : replicas.values()) { - if (replica.getState() == ReplicaState.DEAD) { - replicasIds.add(replica.getId()); - } - } - return replicasIds; + public IReplicationStrategy getReplicationStrategy() { + return strategy; } @Override - public Set<String> getActiveReplicasIds() { - Set<String> replicasIds = new HashSet<>(); - for (Replica replica : replicas.values()) { - if (replica.getState() == ReplicaState.ACTIVE) { - replicasIds.add(replica.getId()); - } - } - return replicasIds; + public void submitJob(IReplicationJob job) { + lsnIndexReplicationManager.accept(job); } @Override - public int getActiveReplicasCount() { - return getActiveReplicasIds().size(); + public boolean isReplicationEnabled() { + return replicationProperties.isReplicationEnabled(); } @Override public void start() { - //do nothing + // no op } @Override - public void dumpState(OutputStream os) throws IOException { - //do nothing + public void dumpState(OutputStream os) { + // no op } - /** - * Called during NC shutdown to notify remote replicas about the shutdown - * and wait for remote replicas shutdown notification then closes the local - * replication channel. - */ @Override public void stop(boolean dumpState, OutputStream ouputStream) throws IOException { - //stop replication thread afters all jobs/logs have been processed - suspendReplication(false); - - /* - * If this node has any remote replicas, it needs to inform them - * that it is shutting down. - */ - if (!replicationStrategy.getRemoteReplicas(nodeId).isEmpty()) { - //send shutdown event to remote replicas - sendShutdownNotifiction(); - } - - /* - * If this node has any remote primary replicas, then it needs to wait - * until all of them send the shutdown notification. - */ - // find active remote primary replicas - Set<String> activeRemotePrimaryReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream() - .map(Replica::getId).filter(getActiveReplicasIds()::contains).collect(Collectors.toSet()); - - if (!activeRemotePrimaryReplicas.isEmpty()) { - //wait until all shutdown events come from all remote primary replicas - synchronized (shuttingDownReplicaIds) { - while (!shuttingDownReplicaIds.containsAll(activeRemotePrimaryReplicas)) { - try { - shuttingDownReplicaIds.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - } - - LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas"); - //close replication channel - asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close(); - - LOGGER.log(Level.INFO, "Replication manager stopped."); - } - - @Override - public void reportReplicaEvent(ReplicaEvent event) { - replicaEventsQ.offer(event); - } - - /** - * Suspends replications and sends a remote replica failure event to ReplicasEventsMonitor. - * - * @param replicaId - * the failed replica id. - */ - public void reportFailedReplica(String replicaId) { - Replica replica = replicas.get(replicaId); - if (replica == null) { - return; - } - if (replica.getState() == ReplicaState.DEAD) { - return; - } - - //need to stop processing any new logs or jobs - terminateJobsReplication.set(true); - - ReplicaEvent event = new ReplicaEvent(replica, ClusterEventType.NODE_FAILURE); - reportReplicaEvent(event); - } - - private String getReplicaIdBySocket(SocketChannel socketChannel) { - InetSocketAddress socketAddress = NetworkingUtil.getSocketAddress(socketChannel); - for (Replica replica : replicas.values()) { - if (replica.getClusterIp().equals(socketAddress.getHostName()) - && ncConfig.getReplicationPublicPort() == socketAddress.getPort()) { - return replica.getId(); - } - } - return null; - } - - @Override - public void startReplicationThreads() throws InterruptedException { - replicationJobsProcessor = new ReplicationJobsProccessor(); - - //start/continue processing jobs/logs - if (logsRepSockets == null) { - establishTxnLogReplicationHandshake(); - getAndInitNewPage(); - txnlogReplicator = new TxnLogReplicator(emptyLogBuffersQ, pendingFlushLogBuffersQ); - txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogReplicator); - } - - replicationJobsProcessor.start(); - - if (!replicationMonitor.isAlive()) { - replicationMonitor.start(); - } - - //notify any waiting threads that replication has been resumed - synchronized (replicationSuspended) { - LOGGER.log(Level.INFO, "Replication started/resumed"); - replicationSuspended.set(false); - replicationSuspended.notifyAll(); - } - } - - @Override - public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException { - long startLSN = logManager.getAppendLSN(); - Set<String> replicaIds = getActiveReplicasIds(); - if (replicaIds.isEmpty()) { - return; - } - ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); - for (String replicaId : replicaIds) { - //1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN. - Map<Long, DatasetResourceReference> laggingIndexes = - replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId, nonSharpCheckpointTargetLSN); - - if (!laggingIndexes.isEmpty()) { - //2. send request to remote replicas that have lagging indexes. - ReplicaIndexFlushRequest laggingIndexesResponse = null; - try (SocketChannel socketChannel = getReplicaSocket(replicaId)) { - ReplicaIndexFlushRequest laggingIndexesRequest = - new ReplicaIndexFlushRequest(laggingIndexes.keySet()); - requestBuffer = - ReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer, laggingIndexesRequest); - NetworkingUtil.transferBufferToChannel(socketChannel, requestBuffer); - - //3. remote replicas will respond with indexes that were not flushed. - ReplicationRequestType responseFunction = waitForResponse(socketChannel, requestBuffer); - - if (responseFunction == ReplicationRequestType.FLUSH_INDEX) { - requestBuffer = ReplicationProtocol.readRequest(socketChannel, requestBuffer); - //returning the indexes that were not flushed - laggingIndexesResponse = ReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer); - } - //send goodbye - ReplicationProtocol.sendGoodbye(socketChannel); - } - - /* - * 4. update checkpoints for indexes that were not flushed - * to the current append LSN to indicate no operations happened - * since the checkpoint start. - */ - if (laggingIndexesResponse != null) { - for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) { - final DatasetResourceReference datasetResourceReference = laggingIndexes.get(resouceId); - indexCheckpointManagerProvider.get(datasetResourceReference).advanceLowWatermark(startLSN); - } - } - } - } - } - - //Recovery Method - @Override - public long getMaxRemoteLSN(Set<String> remoteReplicas) throws IOException { - long maxRemoteLSN = 0; - - ReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer); - Map<String, SocketChannel> replicaSockets = new HashMap<>(); - try { - for (String replicaId : remoteReplicas) { - replicaSockets.put(replicaId, getReplicaSocket(replicaId)); - } - - //send request - Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<String, SocketChannel> replicaSocket = iterator.next(); - SocketChannel clientSocket = replicaSocket.getValue(); - NetworkingUtil.transferBufferToChannel(clientSocket, dataBuffer); - dataBuffer.position(0); - } - - iterator = replicaSockets.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<String, SocketChannel> replicaSocket = iterator.next(); - SocketChannel clientSocket = replicaSocket.getValue(); - //read response - NetworkingUtil.readBytes(clientSocket, dataBuffer, Long.BYTES); - maxRemoteLSN = Math.max(maxRemoteLSN, dataBuffer.getLong()); - } - } finally { - closeReplicaSockets(replicaSockets); - } - - return maxRemoteLSN; - } - - //Recovery Method - @Override - public void requestReplicaFiles(String selectedReplicaId, Set<Integer> partitionsToRecover, - Set<String> existingFiles) throws IOException { - ReplicaFilesRequest request = new ReplicaFilesRequest(partitionsToRecover, existingFiles); - dataBuffer = ReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request); - - try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId)) { - - //transfer request - NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer); - - String indexPath; - String destFilePath; - ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer); - LSMIndexFileProperties fileProperties; - while (responseFunction != ReplicationRequestType.GOODBYE) { - dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer); - - fileProperties = ReplicationProtocol.readFileReplicationRequest(dataBuffer); - - //get index path - indexPath = replicaResourcesManager.getIndexPath(fileProperties); - destFilePath = indexPath + File.separator + fileProperties.getFileName(); - - //create file - File destFile = new File(destFilePath); - destFile.createNewFile(); - - try (RandomAccessFile fileOutputStream = new RandomAccessFile(destFile, "rw"); - FileChannel fileChannel = fileOutputStream.getChannel();) { - fileOutputStream.setLength(fileProperties.getFileSize()); - - NetworkingUtil.downloadFile(fileChannel, socketChannel); - fileChannel.force(true); - } - - //we need to create initial map for .metadata files that belong to remote replicas - if (!fileProperties.isLSMComponentFile() && !fileProperties.getNodeId().equals(nodeId)) { - final ResourceReference indexRef = ResourceReference.of(destFile.getAbsolutePath()); - indexCheckpointManagerProvider.get(indexRef).init(logManager.getAppendLSN()); - } - responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer); - } - - //send goodbye - ReplicationProtocol.sendGoodbye(socketChannel); - } - } - - public int getLogPageSize() { - return replicationProperties.getLogBufferPageSize(); - } - - @Override - public void replicateTxnLogBatch(final ByteBuffer buffer) { - //if replication is suspended, wait until it is resumed - try { - while (replicationSuspended.get()) { - synchronized (replicationSuspended) { - replicationSuspended.wait(); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - //prepare the batch size buffer - txnLogsBatchSizeBuffer.clear(); - txnLogsBatchSizeBuffer.putInt(buffer.remaining()); - txnLogsBatchSizeBuffer.flip(); - - buffer.mark(); - for (SocketChannel replicaSocket : logsRepSockets) { - try { - //send batch size - NetworkingUtil.transferBufferToChannel(replicaSocket, txnLogsBatchSizeBuffer); - //send log - NetworkingUtil.transferBufferToChannel(replicaSocket, buffer); - } catch (IOException e) { - handleReplicationFailure(replicaSocket, e); - } finally { - txnLogsBatchSizeBuffer.position(0); - buffer.reset(); - } - } - //move the bufeer position to the sent limit - buffer.position(buffer.limit()); - } - - @Override - public void register(IPartitionReplica replica) { - // find the replica node based on ip and replication port - Optional<Replica> replicaNode = replicationStrategy.getRemoteReplicasAndSelf(nodeId).stream() - .filter(node -> node.getClusterIp().equals(replica.getIdentifier().getLocation().getHostString()) - && node.getPort() == replica.getIdentifier().getLocation().getPort()) - .findAny(); - if (!replicaNode.isPresent()) { - throw new IllegalStateException("Couldn't find node for replica: " + replica); - } - Replica replicaRef = replicaNode.get(); - final String replicaId = replicaRef.getId(); - replicas.putIfAbsent(replicaId, replicaRef); - replica2PartitionsMap.computeIfAbsent(replicaId, k -> new HashSet<>()); - replica2PartitionsMap.get(replicaId).add(replica.getIdentifier().getPartition()); - updateReplicaInfo(replicaRef); - checkReplicaState(replicaId, false, true); - } - - //supporting classes - /** - * This class is responsible for processing replica events. - */ - private class ReplicasEventsMonitor extends Thread { - ReplicaEvent event; - - @Override - public void run() { - while (true) { - try { - event = replicaEventsQ.take(); - - switch (event.getEventType()) { - case NODE_FAILURE: - handleReplicaFailure(event.getReplica().getId()); - break; - case NODE_JOIN: - updateReplicaInfo(event.getReplica()); - checkReplicaState(event.getReplica().getId(), false, true); - break; - case NODE_SHUTTING_DOWN: - handleShutdownEvent(event.getReplica().getId()); - break; - default: - break; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - public void handleReplicaFailure(String replicaId) throws InterruptedException { - Replica replica = replicas.get(replicaId); - - if (replica.getState() == ReplicaState.DEAD) { - return; - } - - updateReplicaState(replicaId, ReplicaState.DEAD, true); - - //delete any invalid LSMComponents for this replica - replicaResourcesManager.cleanInvalidLSMComponents(replicaId); - } - - public void handleShutdownEvent(String replicaId) { - synchronized (shuttingDownReplicaIds) { - shuttingDownReplicaIds.add(replicaId); - shuttingDownReplicaIds.notifyAll(); - } - } - } - - /** - * This class process is responsible for processing ASYNC replication job. - */ - private class ReplicationJobsProccessor extends Thread { - Map<String, SocketChannel> replicaSockets; - ByteBuffer reusableBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); - - @Override - public void run() { - Thread.currentThread().setName("ReplicationJobsProccessor Thread"); - terminateJobsReplication.set(false); - jobsReplicationSuspended.set(false); - - while (true) { - try { - if (terminateJobsReplication.get()) { - closeSockets(); - break; - } - - IReplicationJob job = replicationJobsQ.take(); - if (job == REPLICATION_JOB_POISON_PILL) { - terminateJobsReplication.set(true); - continue; - } - - //if there isn't already a connection, establish a new one - if (replicaSockets == null) { - replicaSockets = getActiveRemoteReplicasSockets(); - } - processJob(job, replicaSockets, reusableBuffer); - - //if no more jobs to process, close sockets - if (replicationJobsQ.isEmpty()) { - LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas"); - closeSockets(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (IOException e) { - LOGGER.warn("Couldn't complete processing replication job", e); - } - } - - synchronized (jobsReplicationSuspended) { - jobsReplicationSuspended.set(true); - jobsReplicationSuspended.notifyAll(); - } - LOGGER.log(Level.INFO, "ReplicationJobsProccessor stopped. "); - } - - private void closeSockets() { - if (replicaSockets != null) { - closeReplicaSockets(replicaSockets); - replicaSockets.clear(); - replicaSockets = null; - } - } - } - - /** - * This class is responsible for listening on sockets that belong to TxnLogsReplicator. - */ - private class TxnLogsReplicationResponseListener implements Runnable { - final SocketChannel replicaSocket; - final String replicaId; - - public TxnLogsReplicationResponseListener(String replicaId, SocketChannel replicaSocket) { - this.replicaId = replicaId; - this.replicaSocket = replicaSocket; - } - - @Override - public void run() { - Thread.currentThread().setName("TxnLogs Replication Listener Thread"); - LOGGER.log(Level.INFO, "Started listening on socket: " + replicaSocket.socket().getRemoteSocketAddress()); - - try (BufferedReader incomingResponse = - new BufferedReader(new InputStreamReader(replicaSocket.socket().getInputStream()))) { - while (true) { - String responseLine = incomingResponse.readLine(); - if (responseLine == null) { - break; - } - //read ACK for job commit log - String ackFrom = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine); - int jobId = ReplicationProtocol.getJobIdFromLogAckMessage(responseLine); - addAckToJob(jobId, ackFrom); - } - } catch (AsynchronousCloseException e) { - if (LOGGER.isInfoEnabled()) { - LOGGER.log(Level.INFO, "Replication listener stopped for remote replica: " + replicaId, e); - } - } catch (IOException e) { - handleReplicationFailure(replicaSocket, e); - } - } - } - - @Override - public IReplicationStrategy getReplicationStrategy() { - return replicationStrategy; + LOGGER.info("Closing replication channel"); + appCtx.getReplicationChannel().close(); + LOGGER.info("Replication manager stopped"); } -} +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java index 2c1937b..84922cd 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java @@ -25,12 +25,11 @@ import java.io.OutputStream; import java.util.Collection; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.replication.IReplicationThread; +import org.apache.asterix.replication.api.IReplicationWorker; import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.storage.IIndexCheckpointManager; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.replication.api.IReplicaTask; -import org.apache.asterix.replication.functions.ReplicationProtocol; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.common.LocalResource; @@ -47,7 +46,7 @@ public class CheckpointPartitionIndexesTask implements IReplicaTask { } @Override - public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException { + public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException { final IIndexCheckpointManagerProvider indexCheckpointManagerProvider = appCtx.getIndexCheckpointManagerProvider(); PersistentLocalResourceRepository resRepo = http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java new file mode 100644 index 0000000..26c9577 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.messaging; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.asterix.common.utils.StorageConstants; +import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; + +/** + * A task to create a mask file for an incoming lsm component from master + */ +public class ComponentMaskTask implements IReplicaTask { + + private static final String COMPONENT_MASK_FILE_PREFIX = StorageConstants.MASK_FILE_PREFIX + "C_"; + private final String file; + private final String componentId; + + public ComponentMaskTask(String file, String componentId) { + this.file = file; + this.componentId = componentId; + } + + @Override + public void perform(INcApplicationContext appCtx, IReplicationWorker worker) { + try { + // create mask + final Path maskPath = getComponentMaskPath(appCtx, file); + Files.createFile(maskPath); + ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer()); + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + public static Path getComponentMaskPath(INcApplicationContext appCtx, String file) throws IOException { + final IIOManager ioManager = appCtx.getIoManager(); + final FileReference localPath = ioManager.resolve(file); + final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath()); + return Paths.get(resourceDir.toString(), COMPONENT_MASK_FILE_PREFIX + localPath.getFile().getName()); + } + + @Override + public ReplicationProtocol.ReplicationRequestType getMessageType() { + return ReplicationProtocol.ReplicationRequestType.LSM_COMPONENT_MASK; + } + + @Override + public void serialize(OutputStream out) throws HyracksDataException { + try { + final DataOutputStream dos = new DataOutputStream(out); + dos.writeUTF(file); + dos.writeUTF(componentId); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + public static ComponentMaskTask create(DataInput input) throws IOException { + String indexFile = input.readUTF(); + String componentId = input.readUTF(); + return new ComponentMaskTask(indexFile, componentId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java index d4de3b7..1b5470d 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java @@ -27,9 +27,8 @@ import java.nio.file.Files; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ReplicationException; -import org.apache.asterix.common.replication.IReplicationThread; +import org.apache.asterix.replication.api.IReplicationWorker; import org.apache.asterix.replication.api.IReplicaTask; -import org.apache.asterix.replication.functions.ReplicationProtocol; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; import org.apache.logging.log4j.LogManager; @@ -48,7 +47,7 @@ public class DeleteFileTask implements IReplicaTask { } @Override - public void perform(INcApplicationContext appCtx, IReplicationThread worker) { + public void perform(INcApplicationContext appCtx, IReplicationWorker worker) { try { final IIOManager ioManager = appCtx.getIoManager(); final File localFile = ioManager.resolve(file).getFile(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java new file mode 100644 index 0000000..b7f0985 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.messaging; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.api.util.IoUtil; + +/** + * A task to drop an index that was dropped on master + */ +public class DropIndexTask implements IReplicaTask { + + private static final Logger LOGGER = Logger.getLogger(DeleteFileTask.class.getName()); + private final String file; + + public DropIndexTask(String file) { + this.file = file; + } + + @Override + public void perform(INcApplicationContext appCtx, IReplicationWorker worker) { + try { + final IIOManager ioManager = appCtx.getIoManager(); + final File indexFile = ioManager.resolve(file).getFile(); + if (indexFile.exists()) { + File indexDir = indexFile.getParentFile(); + IoUtil.deleteDirectory(indexDir); + LOGGER.info(() -> "Deleted index: " + indexFile.getAbsolutePath()); + } else { + LOGGER.warning(() -> "Requested to delete a non-existing index: " + indexFile.getAbsolutePath()); + } + ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer()); + } catch (IOException e) { + throw new ReplicationException(e); + } + } + + @Override + public ReplicationProtocol.ReplicationRequestType getMessageType() { + return ReplicationProtocol.ReplicationRequestType.REPLICATE_RESOURCE_FILE; + } + + @Override + public void serialize(OutputStream out) throws HyracksDataException { + try { + DataOutputStream dos = new DataOutputStream(out); + dos.writeUTF(file); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + public static DropIndexTask create(DataInput input) throws IOException { + return new DropIndexTask(input.readUTF()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java new file mode 100644 index 0000000..b581321 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.messaging; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.asterix.common.storage.IIndexCheckpointManager; +import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; +import org.apache.asterix.common.storage.ResourceReference; +import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; + +/** + * A task to mark a replicated LSM component as valid + */ +public class MarkComponentValidTask implements IReplicaTask { + + private final long masterLsn; + private final String file; + + public MarkComponentValidTask(String file, long masterLsn) { + this.file = file; + this.masterLsn = masterLsn; + } + + @Override + public void perform(INcApplicationContext appCtx, IReplicationWorker worker) { + try { + if (masterLsn > 0) { + ensureComponentLsnFlushed(appCtx); + } + // delete mask + final Path maskPath = ComponentMaskTask.getComponentMaskPath(appCtx, file); + Files.delete(maskPath); + ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer()); + } catch (IOException | InterruptedException e) { + throw new ReplicationException(e); + } + } + + private void ensureComponentLsnFlushed(INcApplicationContext appCtx) + throws HyracksDataException, InterruptedException { + final ResourceReference indexRef = ResourceReference.of(file); + final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider(); + final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef); + long replicationTimeOut = TimeUnit.SECONDS.toMillis(appCtx.getReplicationProperties().getReplicationTimeOut()); + final long startTime = System.nanoTime(); + synchronized (indexCheckpointManager) { + // wait until the lsn mapping is flushed to disk + while (!indexCheckpointManager.isFlushed(masterLsn)) { + if (replicationTimeOut <= 0) { + throw new ReplicationException(new TimeoutException("Couldn't receive flush lsn from master")); + } + indexCheckpointManager.wait(replicationTimeOut); + replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + } + final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName()); + indexCheckpointManager.replicated(componentEndTime, masterLsn); + } + } + + @Override + public ReplicationProtocol.ReplicationRequestType getMessageType() { + return ReplicationProtocol.ReplicationRequestType.MARK_COMPONENT_VALID; + } + + @Override + public void serialize(OutputStream out) throws HyracksDataException { + try { + final DataOutputStream dos = new DataOutputStream(out); + dos.writeUTF(file); + dos.writeLong(masterLsn); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + public static MarkComponentValidTask create(DataInput input) throws IOException { + final String indexFile = input.readUTF(); + final long lsn = input.readLong(); + return new MarkComponentValidTask(indexFile, lsn); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java index 85b7bb9..561a6bf 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.List; import org.apache.asterix.replication.api.IReplicationMessage; -import org.apache.asterix.replication.functions.ReplicationProtocol; import org.apache.hyracks.api.exceptions.HyracksDataException; public class PartitionResourcesListResponse implements IReplicationMessage { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java index b2b8ac6..b972f32 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java @@ -26,10 +26,10 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.replication.IReplicationThread; +import org.apache.asterix.replication.api.IReplicationWorker; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.replication.api.IReplicaTask; -import org.apache.asterix.replication.functions.ReplicationProtocol; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; /** @@ -44,11 +44,12 @@ public class PartitionResourcesListTask implements IReplicaTask { } @Override - public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException { + public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException { //TODO delete any invalid files with masks - final List<String> partitionResources = - appCtx.getReplicaResourcesManager().getPartitionIndexesFiles(partition, false).stream() - .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList()); + final PersistentLocalResourceRepository localResourceRepository = + (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); + final List<String> partitionResources = localResourceRepository.getPartitionIndexesFiles(partition).stream() + .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList()); final PartitionResourcesListResponse response = new PartitionResourcesListResponse(partition, partitionResources); ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java index 45d8971..99c7256 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java @@ -30,10 +30,12 @@ import java.nio.file.Paths; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ReplicationException; -import org.apache.asterix.common.replication.IReplicationThread; +import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.asterix.common.storage.IIndexCheckpointManager; +import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.replication.api.IReplicaTask; -import org.apache.asterix.replication.functions.ReplicationProtocol; import org.apache.asterix.replication.management.NetworkingUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; @@ -49,14 +51,16 @@ public class ReplicateFileTask implements IReplicaTask { private static final Logger LOGGER = LogManager.getLogger(); private final String file; private final long size; + private final boolean indexMetadata; - public ReplicateFileTask(String file, long size) { + public ReplicateFileTask(String file, long size, boolean indexMetadata) { this.file = file; this.size = size; + this.indexMetadata = indexMetadata; } @Override - public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException { + public void perform(INcApplicationContext appCtx, IReplicationWorker worker) { try { final IIOManager ioManager = appCtx.getIoManager(); // resolve path @@ -76,6 +80,9 @@ public class ReplicateFileTask implements IReplicaTask { NetworkingUtil.downloadFile(fileChannel, worker.getChannel()); fileChannel.force(true); } + if (indexMetadata) { + initIndexCheckpoint(appCtx); + } //delete mask Files.delete(maskPath); LOGGER.info(() -> "Replicated file: " + localPath); @@ -85,6 +92,16 @@ public class ReplicateFileTask implements IReplicaTask { } } + private void initIndexCheckpoint(INcApplicationContext appCtx) throws HyracksDataException { + final ResourceReference indexRef = ResourceReference.of(file); + final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider(); + final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef); + final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN(); + indexCheckpointManager.delete(); + indexCheckpointManager.init(currentLSN); + LOGGER.info(() -> "Checkpoint index: " + indexRef); + } + @Override public ReplicationProtocol.ReplicationRequestType getMessageType() { return ReplicationProtocol.ReplicationRequestType.REPLICATE_RESOURCE_FILE; @@ -96,6 +113,7 @@ public class ReplicateFileTask implements IReplicaTask { DataOutputStream dos = new DataOutputStream(out); dos.writeUTF(file); dos.writeLong(size); + dos.writeBoolean(indexMetadata); } catch (IOException e) { throw HyracksDataException.create(e); } @@ -104,6 +122,7 @@ public class ReplicateFileTask implements IReplicaTask { public static ReplicateFileTask create(DataInput input) throws IOException { final String s = input.readUTF(); final long i = input.readLong(); - return new ReplicateFileTask(s, i); + final boolean isMetadata = input.readBoolean(); + return new ReplicateFileTask(s, i, isMetadata); } } \ No newline at end of file
