This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e4bf05  [NO ISSUE][REP] Extensible Replication Strategy Factory
3e4bf05 is described below

commit 3e4bf05fee9bdb879f0256647e532e2499d91c67
Author: Murtadha Hubail <[email protected]>
AuthorDate: Sat Aug 14 21:01:57 2021 +0300

    [NO ISSUE][REP] Extensible Replication Strategy Factory
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Allow specifying replication strategy factory at
      NC application level.
    - Add replica getter to replica manager.
    
    Change-Id: I7c71ae2d19c81050c4e338efac155ae39f4b202e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12825
    Reviewed-by: Murtadha Hubail <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../apache/asterix/app/nc/NCAppRuntimeContext.java |  6 +++--
 .../org/apache/asterix/app/nc/ReplicaManager.java  |  5 ++++
 .../asterix/hyracks/bootstrap/NCApplication.java   |  8 +++++-
 .../asterix/common/api/INcApplicationContext.java  |  4 ++-
 .../replication/IReplicationStrategyFactory.java   | 30 ++++++++++++++++++++++
 .../replication/ReplicationStrategyFactory.java    |  9 +++----
 .../asterix/common/storage/IReplicaManager.java    |  7 +++++
 .../replication/management/ReplicationManager.java |  7 ++---
 8 files changed, 63 insertions(+), 13 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index eb8a92f..f532352 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -53,6 +53,7 @@ import 
org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategyFactory;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
@@ -176,7 +177,8 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
 
     @Override
     public void initialize(IRecoveryManagerFactory recoveryManagerFactory, 
IReceptionistFactory receptionistFactory,
-            IConfigValidatorFactory configValidatorFactory, boolean 
initialRun) throws IOException {
+            IConfigValidatorFactory configValidatorFactory, 
IReplicationStrategyFactory replicationStrategyFactory,
+            boolean initialRun) throws IOException {
         ioManager = getServiceContext().getIoManager();
         int ioQueueLen = 
getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
         threadExecutor =
@@ -231,7 +233,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         receptionist = receptionistFactory.create();
 
         if (replicationProperties.isReplicationEnabled()) {
-            replicationManager = new ReplicationManager(this, 
replicationProperties);
+            replicationManager = new ReplicationManager(this, 
replicationStrategyFactory, replicationProperties);
 
             //pass replication manager to replication required object
             //LogManager to replicate logs
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 655f9da..954ff8d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -101,6 +101,11 @@ public class ReplicaManager implements IReplicaManager {
     }
 
     @Override
+    public synchronized IPartitionReplica getReplica(ReplicaIdentifier id) {
+        return replicas.get(id);
+    }
+
+    @Override
     public synchronized Set<Integer> getPartitions() {
         return Collections.unmodifiableSet(partitions);
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index b29843c..18e8e65 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -69,6 +69,8 @@ import org.apache.asterix.common.config.PropertiesAccessor;
 import org.apache.asterix.common.config.PropertiesFactory;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.replication.IReplicationStrategyFactory;
+import org.apache.asterix.common.replication.ReplicationStrategyFactory;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -163,7 +165,7 @@ public class NCApplication extends BaseNCApplication {
             updateOnNodeJoin();
         }
         runtimeContext.initialize(getRecoveryManagerFactory(), 
getReceptionistFactory(), getConfigValidatorFactory(),
-                runtimeContext.getNodeProperties().isInitialRun());
+                getReplicationStrategyFactory(), 
runtimeContext.getNodeProperties().isInitialRun());
         MessagingProperties messagingProperties = 
runtimeContext.getMessagingProperties();
         NCMessageBroker messageBroker = new NCMessageBroker(controllerService, 
messagingProperties);
         this.ncServiceCtx.setMessageBroker(messageBroker);
@@ -194,6 +196,10 @@ public class NCApplication extends BaseNCApplication {
         return Receptionist::new;
     }
 
+    protected IReplicationStrategyFactory getReplicationStrategyFactory() {
+        return new ReplicationStrategyFactory();
+    }
+
     protected IConfigValidatorFactory getConfigValidatorFactory() {
         return ConfigValidator::new;
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8c82979..5475b97 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -26,6 +26,7 @@ import 
org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategyFactory;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.transactions.IRecoveryManagerFactory;
@@ -71,7 +72,8 @@ public interface INcApplicationContext extends 
IApplicationContext {
     IResourceIdFactory getResourceIdFactory();
 
     void initialize(IRecoveryManagerFactory recoveryManagerFactory, 
IReceptionistFactory receptionistFactory,
-            IConfigValidatorFactory configValidatorFactory, boolean 
initialRun) throws IOException, AlgebricksException;
+            IConfigValidatorFactory configValidatorFactory, 
IReplicationStrategyFactory replicationStrategyFactory,
+            boolean initialRun) throws IOException, AlgebricksException;
 
     void setShuttingdown(boolean b);
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategyFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategyFactory.java
new file mode 100644
index 0000000..384cd65
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategyFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+public interface IReplicationStrategyFactory {
+
+    /**
+     * Creates a replication strategy based on the provided {@code name}
+     *
+     * @param name
+     * @return the replication strategy
+     */
+    IReplicationStrategy create(String name);
+}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
index 9c129d8..935b08f 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
@@ -21,7 +21,7 @@ package org.apache.asterix.common.replication;
 import java.util.HashMap;
 import java.util.Map;
 
-public class ReplicationStrategyFactory {
+public class ReplicationStrategyFactory implements IReplicationStrategyFactory 
{
 
     private static final Map<String, Class<? extends IReplicationStrategy>> 
BUILT_IN_REPLICATION_STRATEGY =
             new HashMap<>();
@@ -32,11 +32,8 @@ public class ReplicationStrategyFactory {
         BUILT_IN_REPLICATION_STRATEGY.put("metadata", 
MetadataOnlyReplicationStrategy.class);
     }
 
-    private ReplicationStrategyFactory() {
-        throw new AssertionError();
-    }
-
-    public static IReplicationStrategy create(String name) {
+    @Override
+    public IReplicationStrategy create(String name) {
         String strategyName = name.toLowerCase();
         if (!BUILT_IN_REPLICATION_STRATEGY.containsKey(strategyName)) {
             throw new IllegalStateException("Couldn't find strategy with name: 
" + name);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index 1b8ec53..b081c65 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -79,4 +79,11 @@ public interface IReplicaManager {
      * @return the synchronization lock
      */
     Object getReplicaSyncLock();
+
+    /**
+     * Gets the partition replicas matching {@code id}
+     * @param id
+     * @return The partition replica if found. Otherwise, null.
+     */
+    IPartitionReplica getReplica(ReplicaIdentifier id);
 }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 7ed674e..5cd26de 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -30,7 +30,7 @@ import 
org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.replication.IReplicationDestination;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.common.replication.ReplicationStrategyFactory;
+import org.apache.asterix.common.replication.IReplicationStrategyFactory;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.replication.api.ReplicationDestination;
 import org.apache.hyracks.api.replication.IReplicationJob;
@@ -50,10 +50,11 @@ public class ReplicationManager implements 
IReplicationManager {
     private final LogReplicationManager logReplicationManager;
     private final IndexReplicationManager lsnIndexReplicationManager;
 
-    public ReplicationManager(INcApplicationContext appCtx, 
ReplicationProperties replicationProperties) {
+    public ReplicationManager(INcApplicationContext appCtx, 
IReplicationStrategyFactory replicationStrategyFactory,
+            ReplicationProperties replicationProperties) {
         this.replicationProperties = replicationProperties;
         this.appCtx = appCtx;
-        strategy = 
ReplicationStrategyFactory.create(replicationProperties.getReplicationStrategy());
+        strategy = 
replicationStrategyFactory.create(replicationProperties.getReplicationStrategy());
         logReplicationManager = new LogReplicationManager(appCtx, this);
         lsnIndexReplicationManager = new IndexReplicationManager(appCtx, this);
     }

Reply via email to