This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3bc7bc2e80af1e277ffb4906e6aca416a9de954b
Author: Krzysztof BiaƂek <[email protected]>
AuthorDate: Tue Sep 18 11:18:03 2018 +0200

    [FLINK-8660][ha] Enable user to provide custom HAServices implementation
    
    Create BlobStorage for any HA backend
    
    HighAvailabilityServicesFactory may throw exceptions
    
    Docs
    
    Use ha mode config property to specify factory class FQN
    
    Update docs
---
 docs/_includes/generated/common_section.html       |  2 +-
 .../generated/high_availability_configuration.html |  2 +-
 docs/ops/jobmanager_high_availability.md           |  1 +
 .../configuration/HighAvailabilityOptions.java     |  4 +-
 .../org/apache/flink/runtime/blob/BlobUtils.java   |  8 +--
 .../HighAvailabilityServicesFactory.java           | 39 ++++++++++
 .../HighAvailabilityServicesUtils.java             | 28 +++++++-
 .../runtime/jobmanager/HighAvailabilityMode.java   | 29 ++++----
 .../HighAvailabilityServicesUtilsTest.java         | 83 ++++++++++++++++++++++
 .../jobmanager/HighAvailabilityModeTest.java       | 33 ++++++++-
 10 files changed, 203 insertions(+), 26 deletions(-)

diff --git a/docs/_includes/generated/common_section.html 
b/docs/_includes/generated/common_section.html
index ea881e0..65804db 100644
--- a/docs/_includes/generated/common_section.html
+++ b/docs/_includes/generated/common_section.html
@@ -45,7 +45,7 @@
         <tr>
             <td><h5>high-availability</h5></td>
             <td style="word-wrap: break-word;">"NONE"</td>
-            <td>Defines high-availability mode used for the cluster execution. 
To enable high-availability, set this mode to "ZOOKEEPER".</td>
+            <td>Defines high-availability mode used for the cluster execution. 
To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of 
factory class.</td>
         </tr>
         <tr>
             <td><h5>high-availability.storageDir</h5></td>
diff --git a/docs/_includes/generated/high_availability_configuration.html 
b/docs/_includes/generated/high_availability_configuration.html
index b1e2ea9..398379d 100644
--- a/docs/_includes/generated/high_availability_configuration.html
+++ b/docs/_includes/generated/high_availability_configuration.html
@@ -10,7 +10,7 @@
         <tr>
             <td><h5>high-availability</h5></td>
             <td style="word-wrap: break-word;">"NONE"</td>
-            <td>Defines high-availability mode used for the cluster execution. 
To enable high-availability, set this mode to "ZOOKEEPER".</td>
+            <td>Defines high-availability mode used for the cluster execution. 
To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of 
factory class.</td>
         </tr>
         <tr>
             <td><h5>high-availability.cluster-id</h5></td>
diff --git a/docs/ops/jobmanager_high_availability.md 
b/docs/ops/jobmanager_high_availability.md
index 320f9d6..2c2f1a6 100644
--- a/docs/ops/jobmanager_high_availability.md
+++ b/docs/ops/jobmanager_high_availability.md
@@ -65,6 +65,7 @@ By default, the job manager will pick a *random port* for 
inter process communic
 In order to start an HA-cluster add the following configuration keys to 
`conf/flink-conf.yaml`:
 
 - **high-availability mode** (required): The *high-availability mode* has to 
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high 
availability mode.
+Alternatively this option can be set to FQN of factory class Flink should use 
to create HighAvailabilityServices instance. 
 
   <pre>high-availability: zookeeper</pre>
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 787efff..5881533 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -43,6 +43,7 @@ public class HighAvailabilityOptions {
         * Defines high-availability mode used for the cluster execution.
         * A value of "NONE" signals no highly available setup.
         * To enable high-availability, set this mode to "ZOOKEEPER".
+        * Can also be set to FQN of HighAvailability factory class.
         */
        @Documentation.CommonOption(position = 
Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
        public static final ConfigOption<String> HA_MODE =
@@ -50,7 +51,7 @@ public class HighAvailabilityOptions {
                        .defaultValue("NONE")
                        .withDeprecatedKeys("recovery.mode")
                        .withDescription("Defines high-availability mode used 
for the cluster execution." +
-                               " To enable high-availability, set this mode to 
\"ZOOKEEPER\".");
+                               " To enable high-availability, set this mode to 
\"ZOOKEEPER\" or specify FQN of factory class.");
 
        /**
         * The ID of the Flink cluster, used to separate multiple Flink clusters
@@ -73,7 +74,6 @@ public class HighAvailabilityOptions {
                        
.withDeprecatedKeys("high-availability.zookeeper.storageDir", 
"recovery.zookeeper.storageDir")
                        .withDescription("File system path (URI) where Flink 
persists metadata in high-availability setups.");
 
-
        // 
------------------------------------------------------------------------
        //  Recovery Options
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index a61d679..e1caa9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -88,14 +88,10 @@ public class BlobUtils {
         *              thrown if the (distributed) file storage cannot be 
created
         */
        public static BlobStoreService createBlobStoreFromConfig(Configuration 
config) throws IOException {
-               HighAvailabilityMode highAvailabilityMode = 
HighAvailabilityMode.fromConfig(config);
-
-               if (highAvailabilityMode == HighAvailabilityMode.NONE) {
-                       return new VoidBlobStore();
-               } else if (highAvailabilityMode == 
HighAvailabilityMode.ZOOKEEPER) {
+               if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(config)) {
                        return createFileSystemBlobStore(config);
                } else {
-                       throw new IllegalConfigurationException("Unexpected 
high availability mode '" + highAvailabilityMode + "'.");
+                       return new VoidBlobStore();
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java
new file mode 100644
index 0000000..0abc6de
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Factory interface for {@link HighAvailabilityServices}.
+ */
+public interface HighAvailabilityServicesFactory {
+
+       /**
+        * Creates an {@link HighAvailabilityServices} instance.
+        *
+        * @param configuration Flink configuration
+        * @param executor background task executor
+        * @return instance of {@link HighAvailabilityServices}
+        * @throws Exception when HAServices cannot be created
+        */
+       HighAvailabilityServices createHAServices(Configuration configuration, 
Executor executor) throws Exception;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 778d2db..78484d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.blob.BlobStoreService;
@@ -64,11 +65,14 @@ public class HighAvailabilityServicesUtils {
                                        config,
                                        blobStoreService);
 
+                       case FACTORY_CLASS:
+                               return createCustomHAServices(config, executor);
+
                        default:
                                throw new Exception("High availability mode " + 
highAvailabilityMode + " is not supported.");
                }
        }
-       
+
        public static HighAvailabilityServices createHighAvailabilityServices(
                Configuration configuration,
                Executor executor,
@@ -76,7 +80,7 @@ public class HighAvailabilityServicesUtils {
 
                HighAvailabilityMode highAvailabilityMode = 
LeaderRetrievalUtils.getRecoveryMode(configuration);
 
-               switch(highAvailabilityMode) {
+               switch (highAvailabilityMode) {
                        case NONE:
                                final Tuple2<String, Integer> hostnamePort = 
getJobManagerAddress(configuration);
 
@@ -119,6 +123,10 @@ public class HighAvailabilityServicesUtils {
                                        executor,
                                        configuration,
                                        blobStoreService);
+
+                       case FACTORY_CLASS:
+                               return createCustomHAServices(configuration, 
executor);
+
                        default:
                                throw new Exception("Recovery mode " + 
highAvailabilityMode + " is not supported.");
                }
@@ -151,6 +159,22 @@ public class HighAvailabilityServicesUtils {
                return Tuple2.of(hostname, port);
        }
 
+       private static HighAvailabilityServices 
createCustomHAServices(Configuration config, Executor executor) throws 
Exception {
+               Class<HighAvailabilityServicesFactory> factoryClass;
+               try {
+                       factoryClass = config.getClass(
+                               HighAvailabilityOptions.HA_MODE.key(), null, 
Thread.currentThread().getContextClassLoader());
+               } catch (ClassNotFoundException e) {
+                       throw new Exception("Custom HA FactoryClass not found");
+               }
+
+               if (factoryClass != null && 
HighAvailabilityServicesFactory.class.isAssignableFrom(factoryClass)) {
+                       return 
factoryClass.newInstance().createHAServices(config, executor);
+               } else {
+                       throw new Exception("Custom HA FactoryClass is not 
valid.");
+               }
+       }
+
        public enum AddressResolution {
                TRY_ADDRESS_RESOLUTION,
                NO_ADDRESS_RESOLUTION
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
index 7dc13c2..65c202a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
@@ -30,10 +30,19 @@ import 
org.apache.flink.configuration.HighAvailabilityOptions;
  * ZooKeeper is used to select a leader among a group of JobManager. This 
JobManager
  * is responsible for the job execution. Upon failure of the leader a new 
leader is elected
  * which will take over the responsibilities of the old leader
+ * - FACTORY_CLASS: Use implementation of {@link 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory}
+ * specified in configuration property high-availability
  */
 public enum HighAvailabilityMode {
-       NONE,
-       ZOOKEEPER;
+       NONE(false),
+       ZOOKEEPER(true),
+       FACTORY_CLASS(true);
+
+       private final boolean haActive;
+
+       HighAvailabilityMode(boolean haActive) {
+               this.haActive = haActive;
+       }
 
        /**
         * Return the configured {@link HighAvailabilityMode}.
@@ -51,7 +60,11 @@ public enum HighAvailabilityMode {
                        // Map old default to new default
                        return HighAvailabilityMode.NONE;
                } else {
-                       return 
HighAvailabilityMode.valueOf(haMode.toUpperCase());
+                       try {
+                               return 
HighAvailabilityMode.valueOf(haMode.toUpperCase());
+                       } catch (IllegalArgumentException e) {
+                               return FACTORY_CLASS;
+                       }
                }
        }
 
@@ -63,14 +76,6 @@ public enum HighAvailabilityMode {
         */
        public static boolean isHighAvailabilityModeActivated(Configuration 
configuration) {
                HighAvailabilityMode mode = fromConfig(configuration);
-               switch (mode) {
-                       case NONE:
-                               return false;
-                       case ZOOKEEPER:
-                               return true;
-                       default:
-                               return false;
-               }
-
+               return mode.haActive;
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
new file mode 100644
index 0000000..e9063ac
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests for the {@link HighAvailabilityServicesUtils} class.
+ */
+public class HighAvailabilityServicesUtilsTest extends TestLogger {
+
+       @Test
+       public void testCreateCustomHAServices() throws Exception {
+               Configuration config = new Configuration();
+
+               HighAvailabilityServices haServices = 
Mockito.mock(HighAvailabilityServices.class);
+               TestHAFactory.haServices = haServices;
+
+               Executor executor = Mockito.mock(Executor.class);
+
+               config.setString(HighAvailabilityOptions.HA_MODE, 
TestHAFactory.class.getName());
+
+               // when
+               HighAvailabilityServices actualHaServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, 
executor);
+
+               // then
+               assertSame(haServices, actualHaServices);
+
+               // when
+               actualHaServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(config, executor,
+                       
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+               // then
+               assertSame(haServices, actualHaServices);
+       }
+
+       @Test(expected = Exception.class)
+       public void testCustomHAServicesFactoryNotDefined() throws Exception {
+               Configuration config = new Configuration();
+
+               Executor executor = Mockito.mock(Executor.class);
+
+               config.setString(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.FACTORY_CLASS.name().toLowerCase());
+
+               // expect
+               
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, 
executor);
+       }
+
+       private static class TestHAFactory implements 
HighAvailabilityServicesFactory {
+
+               static HighAvailabilityServices haServices;
+
+               @Override
+               public HighAvailabilityServices createHAServices(Configuration 
configuration, Executor executor) {
+                       return haServices;
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
index 91fb514..e708c10 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java
@@ -21,15 +21,20 @@ package org.apache.flink.runtime.jobmanager;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-public class HighAvailabilityModeTest {
+/**
+ * Tests for the {@link HighAvailabilityMode}.
+ */
+public class HighAvailabilityModeTest extends TestLogger {
 
        // Default HA mode
-       private final static HighAvailabilityMode DEFAULT_HA_MODE = 
HighAvailabilityMode.valueOf(
+       private static final HighAvailabilityMode DEFAULT_HA_MODE = 
HighAvailabilityMode.valueOf(
                        ConfigConstants.DEFAULT_HA_MODE.toUpperCase());
 
        /**
@@ -45,6 +50,10 @@ public class HighAvailabilityModeTest {
                // Check not equals default
                config.setString(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
                assertEquals(HighAvailabilityMode.ZOOKEEPER, 
HighAvailabilityMode.fromConfig(config));
+
+               // Check factory class
+               config.setString(HighAvailabilityOptions.HA_MODE, 
"factory.class.FQN");
+               assertEquals(HighAvailabilityMode.FACTORY_CLASS, 
HighAvailabilityMode.fromConfig(config));
        }
 
        /**
@@ -69,4 +78,24 @@ public class HighAvailabilityModeTest {
                assertEquals(HighAvailabilityMode.NONE, 
HighAvailabilityMode.fromConfig(config));
        }
 
+       @Test
+       public void testCheckHighAvailabilityModeActivated() throws Exception {
+               Configuration config = new Configuration();
+
+               // check defaults
+               
assertTrue(!HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+
+               // check NONE
+               config.setString("high-availability", 
HighAvailabilityMode.NONE.name().toLowerCase());
+               
assertTrue(!HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+
+               // check ZOOKEEPER
+               config.setString("high-availability", 
HighAvailabilityMode.ZOOKEEPER.name().toLowerCase());
+               
assertTrue(HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+
+               // check FACTORY_CLASS
+               config.setString("high-availability", 
HighAvailabilityMode.FACTORY_CLASS.name().toLowerCase());
+               
assertTrue(HighAvailabilityMode.isHighAvailabilityModeActivated(config));
+       }
+
 }

Reply via email to