This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0799b5c  [FLINK-17384][connectors/hbase] Support reading hbase conf 
dir from flink-conf.yaml
0799b5c is described below

commit 0799b5c20a127110e47439668cf8f8db2e4ecbf3
Author: liuyongvs <[email protected]>
AuthorDate: Thu May 14 15:48:50 2020 +0800

    [FLINK-17384][connectors/hbase] Support reading hbase conf dir from 
flink-conf.yaml
    
    This closes #12144.
---
 .../generated/environment_configuration.html       |   6 +
 .../connector/hbase/HBaseDynamicTableFactory.java  |   4 +-
 .../flink/connector/hbase/HBaseTableFactory.java   |   4 +-
 .../hbase/sink/HBaseDynamicTableSink.java          |   4 +-
 .../connector/hbase/sink/HBaseSinkFunction.java    |   3 +-
 .../connector/hbase/sink/HBaseUpsertTableSink.java |   4 +-
 .../hbase/source/AbstractTableInputFormat.java     |   3 +-
 .../hbase/source/HBaseLookupFunction.java          |   3 +-
 .../hbase/util/HBaseConfigurationUtil.java         |  66 +++++++-
 .../hbase/util/HBaseConfigLoadingTest.java         | 176 +++++++++++++++++++++
 .../src/test/resources/hbase-site.xml              |  29 ++++
 .../apache/flink/configuration/CoreOptions.java    |  11 ++
 flink-dist/src/main/flink-bin/bin/config.sh        |  24 +++
 13 files changed, 322 insertions(+), 15 deletions(-)

diff --git a/docs/_includes/generated/environment_configuration.html 
b/docs/_includes/generated/environment_configuration.html
index 912ecbb..0fffb88 100644
--- a/docs/_includes/generated/environment_configuration.html
+++ b/docs/_includes/generated/environment_configuration.html
@@ -15,6 +15,12 @@
             <td>Path to hadoop configuration directory. It is required to read 
HDFS and/or YARN configuration. You can also set it via environment 
variable.</td>
         </tr>
         <tr>
+            <td><h5>env.hbase.conf.dir</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Path to hbase configuration directory. It is required to read 
HBASE configuration. You can also set it via environment variable.</td>
+        </tr>
+        <tr>
             <td><h5>env.java.opts</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
index b0e6dee..48b70eb 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.hbase.options.HBaseOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.sink.HBaseDynamicTableSink;
 import org.apache.flink.connector.hbase.source.HBaseDynamicTableSource;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -34,7 +35,6 @@ import 
org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 
 import java.time.Duration;
@@ -108,7 +108,7 @@ public class HBaseDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
 
                String hTableName = helper.getOptions().get(TABLE_NAME);
                // create default configuration from current runtime env 
(`hbase-site.xml` in classpath) first,
-               Configuration hbaseClientConf = HBaseConfiguration.create();
+               Configuration hbaseClientConf = 
HBaseConfigurationUtil.getHBaseConfiguration();
                hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, 
helper.getOptions().get(ZOOKEEPER_QUORUM));
                hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, 
helper.getOptions().get(ZOOKEEPER_ZNODE_PARENT));
                String nullStringLiteral = 
helper.getOptions().get(NULL_STRING_LITERAL);
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
index f64f9b9..ca3e1e5 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.hbase.options.HBaseOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.sink.HBaseUpsertTableSink;
 import org.apache.flink.connector.hbase.source.HBaseTableSource;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -38,7 +39,6 @@ import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 
 import java.sql.Date;
@@ -83,7 +83,7 @@ public class HBaseTableFactory implements 
StreamTableSourceFactory<Row>, StreamT
        public StreamTableSource<Row> createStreamTableSource(Map<String, 
String> properties) {
                final DescriptorProperties descriptorProperties = 
getValidatedProperties(properties);
                // create default configuration from current runtime env 
(`hbase-site.xml` in classpath) first,
-               Configuration hbaseClientConf = HBaseConfiguration.create();
+               Configuration hbaseClientConf = 
HBaseConfigurationUtil.getHBaseConfiguration();
                String hbaseZk = 
descriptorProperties.getString(CONNECTOR_ZK_QUORUM);
                hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
                descriptorProperties
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseDynamicTableSink.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseDynamicTableSink.java
index 193a09a..5d60ae3 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseDynamicTableSink.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseDynamicTableSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.hbase.options.HBaseOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -30,7 +31,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 
 /**
@@ -57,7 +57,7 @@ public class HBaseDynamicTableSink implements 
DynamicTableSink {
 
        @Override
        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
-               Configuration hbaseClientConf = HBaseConfiguration.create();
+               Configuration hbaseClientConf = 
HBaseConfigurationUtil.getHBaseConfiguration();
                hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, 
hbaseOptions.getZkQuorum());
                hbaseOptions.getZkNodeParent().ifPresent(v -> 
hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
                HBaseSinkFunction<RowData> sinkFunction = new 
HBaseSinkFunction<>(
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
index 3d96e5d..e9d16bc 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.util.StringUtils;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
@@ -152,7 +151,7 @@ public class HBaseSinkFunction<T>
                // create default configuration from current runtime env 
(`hbase-site.xml` in classpath) first,
                // and overwrite configuration using serialized configuration 
from client-side env (`hbase-site.xml` in classpath).
                // user params from client-side have the highest priority
-               org.apache.hadoop.conf.Configuration runtimeConfig = 
HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, 
HBaseConfiguration.create());
+               org.apache.hadoop.conf.Configuration runtimeConfig = 
HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, 
HBaseConfigurationUtil.getHBaseConfiguration());
 
                // do validation: check key option(s) in final runtime 
configuration
                if 
(StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM)))
 {
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseUpsertTableSink.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseUpsertTableSink.java
index 0f8b8eb..f882833 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseUpsertTableSink.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseUpsertTableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.hbase.options.HBaseOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -35,7 +36,6 @@ import org.apache.flink.table.utils.TableConnectorUtils;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 
 import java.util.Arrays;
@@ -89,7 +89,7 @@ public class HBaseUpsertTableSink implements 
UpsertStreamTableSink<Row> {
 
        @Override
        public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
Row>> dataStream) {
-               Configuration hbaseClientConf = HBaseConfiguration.create();
+               Configuration hbaseClientConf = 
HBaseConfigurationUtil.getHBaseConfiguration();
                hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, 
hbaseOptions.getZkQuorum());
                hbaseOptions.getZkNodeParent().ifPresent(v -> 
hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
                HBaseSinkFunction sinkFunction = new HBaseSinkFunction(
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
index 94d36d3..feeff3d 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.core.io.InputSplitAssigner;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -109,7 +108,7 @@ abstract class AbstractTableInputFormat<T> extends 
RichInputFormat<T, TableInput
        public abstract void configure(Configuration parameters);
 
        protected org.apache.hadoop.conf.Configuration getHadoopConfiguration() 
{
-               return 
HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, 
HBaseConfiguration.create());
+               return 
HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, 
HBaseConfigurationUtil.getHBaseConfiguration());
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
index 1d608e9..ed2807d 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
@@ -30,7 +30,6 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
@@ -93,7 +92,7 @@ public class HBaseLookupFunction extends TableFunction<Row> {
                // user params from client-side have the highest priority
                org.apache.hadoop.conf.Configuration runtimeConfig = 
HBaseConfigurationUtil.deserializeConfiguration(
                        serializedConfig,
-                       HBaseConfiguration.create());
+                       HBaseConfigurationUtil.getHBaseConfiguration());
 
                // do validation: check key option(s) in final runtime 
configuration
                if 
(StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM)))
 {
diff --git 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
index a7b11e5..165f95a 100644
--- 
a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
+++ 
b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
@@ -22,20 +22,84 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 
 /**
- * This class helps to do serialization for hadoop Configuration.
+ * This class helps to do serialization for hadoop Configuration and 
HBase-related classes.
  */
 @Internal
 public class HBaseConfigurationUtil {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(HBaseConfigurationUtil.class);
+
+       public static Configuration getHBaseConfiguration() {
+
+               // Instantiate an HBaseConfiguration to load the 
hbase-default.xml and hbase-site.xml from the classpath.
+               Configuration result = HBaseConfiguration.create();
+               boolean foundHBaseConfiguration = false;
+
+               // We need to load both hbase-default.xml and hbase-site.xml to 
the hbase configuration
+               // The properties of a newly added resource will override the 
ones in previous resources, so a configuration
+               // file with higher priority should be added later.
+
+               // Approach 1: HBASE_HOME environment variables
+               String possibleHBaseConfPath = null;
+
+               final String hbaseHome = System.getenv("HBASE_HOME");
+               if (hbaseHome != null) {
+                       LOG.debug("Searching HBase configuration files in 
HBASE_HOME: {}", hbaseHome);
+                       possibleHBaseConfPath = hbaseHome + "/conf";
+               }
+
+               if (possibleHBaseConfPath != null) {
+                       foundHBaseConfiguration = addHBaseConfIfFound(result, 
possibleHBaseConfPath);
+               }
+
+               // Approach 2: HBASE_CONF_DIR environment variable
+               String hbaseConfDir = System.getenv("HBASE_CONF_DIR");
+               if (hbaseConfDir != null) {
+                       LOG.debug("Searching HBase configuration files in 
HBASE_CONF_DIR: {}", hbaseConfDir);
+                       foundHBaseConfiguration = addHBaseConfIfFound(result, 
hbaseConfDir) || foundHBaseConfiguration;
+               }
+
+               if (!foundHBaseConfiguration) {
+                       LOG.warn("Could not find HBase configuration via any of 
the supported methods " +
+                               "(Flink configuration, environment 
variables).");
+               }
+
+               return result;
+       }
+
+       /**
+        * Search HBase configuration files in the given path, and add them to 
the configuration if found.
+        */
+       private static boolean addHBaseConfIfFound(Configuration configuration, 
String possibleHBaseConfPath) {
+               boolean foundHBaseConfiguration = false;
+               if (new File(possibleHBaseConfPath).exists()) {
+                       if (new File(possibleHBaseConfPath + 
"/hbase-default.xml").exists()) {
+                               configuration.addResource(new 
org.apache.hadoop.fs.Path(possibleHBaseConfPath + "/hbase-default.xml"));
+                               LOG.debug("Adding " + possibleHBaseConfPath + 
"/hbase-default.xml to hbase configuration");
+                               foundHBaseConfiguration = true;
+                       }
+                       if (new File(possibleHBaseConfPath + 
"/hbase-site.xml").exists()) {
+                               configuration.addResource(new 
org.apache.hadoop.fs.Path(possibleHBaseConfPath + "/hbase-site.xml"));
+                               LOG.debug("Adding " + possibleHBaseConfPath + 
"/hbase-site.xml to hbase configuration");
+                               foundHBaseConfiguration = true;
+                       }
+               }
+               return foundHBaseConfiguration;
+       }
+
        /**
         * Serialize a Hadoop {@link Configuration} into byte[].
         */
diff --git 
a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseConfigLoadingTest.java
 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseConfigLoadingTest.java
new file mode 100644
index 0000000..89aa23c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseConfigLoadingTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.util;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that validate the loading of the HBase configuration, relative to
+ * entries in the Flink configuration and the environment variables.
+ */
+public class HBaseConfigLoadingTest {
+
+       private static final String IN_HBASE_CONFIG_KEY = "hbase_conf_key";
+       private static final String IN_HBASE_CONFIG_VALUE = "hbase_conf_value!";
+
+       @Rule
+       public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void loadFromClasspathByDefault() {
+               org.apache.hadoop.conf.Configuration hbaseConf =
+                       HBaseConfigurationUtil.getHBaseConfiguration();
+
+               assertEquals(IN_HBASE_CONFIG_VALUE, 
hbaseConf.get(IN_HBASE_CONFIG_KEY, null));
+       }
+
+       @Test
+       public void loadFromEnvVariables() throws Exception {
+               final String k1 = "where?";
+               final String v1 = "I'm on a boat";
+               final String k2 = "when?";
+               final String v2 = "midnight";
+               final String k3 = "why?";
+               final String v3 = "what do you think?";
+               final String k4 = "which way?";
+               final String v4 = "south, always south...";
+
+               final File hbaseConfDir = tempFolder.newFolder();
+
+               final File hbaseHome = tempFolder.newFolder();
+
+               final File hbaseHomeConf = new File(hbaseHome, "conf");
+
+               assertTrue(hbaseHomeConf.mkdirs());
+
+               final File file1 = new File(hbaseConfDir, "hbase-default.xml");
+               final File file2 = new File(hbaseConfDir, "hbase-site.xml");
+               final File file3 = new File(hbaseHomeConf, "hbase-default.xml");
+               final File file4 = new File(hbaseHomeConf, "hbase-site.xml");
+
+               printConfig(file1, k1, v1);
+               printConfig(file2, k2, v2);
+               printConfig(file3, k3, v3);
+               printConfig(file4, k4, v4);
+
+               final org.apache.hadoop.conf.Configuration hbaseConf;
+
+               final Map<String, String> originalEnv = System.getenv();
+               final Map<String, String> newEnv = new HashMap<>(originalEnv);
+               newEnv.put("HBASE_CONF_DIR", hbaseConfDir.getAbsolutePath());
+               newEnv.put("HBASE_HOME", hbaseHome.getAbsolutePath());
+               try {
+                       CommonTestUtils.setEnv(newEnv);
+                       hbaseConf = 
HBaseConfigurationUtil.getHBaseConfiguration();
+               }
+               finally {
+                       CommonTestUtils.setEnv(originalEnv);
+               }
+
+               // contains extra entries
+               assertEquals(v1, hbaseConf.get(k1, null));
+               assertEquals(v2, hbaseConf.get(k2, null));
+               assertEquals(v3, hbaseConf.get(k3, null));
+               assertEquals(v4, hbaseConf.get(k4, null));
+
+               // also contains classpath defaults
+               assertEquals(IN_HBASE_CONFIG_VALUE, 
hbaseConf.get(IN_HBASE_CONFIG_KEY, null));
+       }
+
+       @Test
+       public void loadOverlappingConfig() throws Exception {
+               final String k1 = "key1";
+
+               final String v1 = "from HBASE_HOME/conf";
+               final String v2 = "from HBASE_CONF_DIR";
+
+               final File hbaseHome = tempFolder.newFolder("hbaseHome");
+               final File hbaseHomeConf = new File(hbaseHome, "conf");
+
+               final File hbaseConfDir = tempFolder.newFolder("hbaseConfDir");
+
+               assertTrue(hbaseHomeConf.mkdirs());
+               final File file1 = new File(hbaseHomeConf, "hbase-site.xml");
+
+               Map<String, String> properties1 = new HashMap<>();
+               properties1.put(k1, v1);
+               printConfigs(file1, properties1);
+
+               // HBASE_CONF_DIR conf will override k1 with v2
+               final File file2 = new File(hbaseConfDir, "hbase-site.xml");
+               Map<String, String> properties2 = new HashMap<>();
+               properties2.put(k1, v2);
+               printConfigs(file2, properties2);
+
+               final org.apache.hadoop.conf.Configuration hbaseConf;
+
+               final Map<String, String> originalEnv = System.getenv();
+               final Map<String, String> newEnv = new HashMap<>(originalEnv);
+               newEnv.put("HBASE_CONF_DIR", hbaseConfDir.getAbsolutePath());
+               newEnv.put("HBASE_HOME", hbaseHome.getAbsolutePath());
+               try {
+                       CommonTestUtils.setEnv(newEnv);
+                       hbaseConf = 
HBaseConfigurationUtil.getHBaseConfiguration();
+               }
+               finally {
+                       CommonTestUtils.setEnv(originalEnv);
+               }
+
+               // contains extra entries
+               assertEquals(v2, hbaseConf.get(k1, null));
+
+               // also contains classpath defaults
+               assertEquals(IN_HBASE_CONFIG_VALUE, 
hbaseConf.get(IN_HBASE_CONFIG_KEY, null));
+       }
+
+       private static void printConfig(File file, String key, String value) 
throws IOException {
+               Map<String, String> map = new HashMap<>(1);
+               map.put(key, value);
+               printConfigs(file, map);
+       }
+
+       private static void printConfigs(File file, Map<String, String> 
properties) throws IOException {
+               try (PrintStream out = new PrintStream(new 
FileOutputStream(file))) {
+                       out.println("<?xml version=\"1.0\"?>");
+                       out.println("<?xml-stylesheet type=\"text/xsl\" 
href=\"configuration.xsl\"?>");
+                       out.println("<configuration>");
+                       for (Map.Entry<String, String> entry: 
properties.entrySet()) {
+                               out.println("\t<property>");
+                               out.println("\t\t<name>" + entry.getKey() + 
"</name>");
+                               out.println("\t\t<value>" + entry.getValue() + 
"</value>");
+                               out.println("\t</property>");
+                       }
+                       out.println("</configuration>");
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hbase/src/test/resources/hbase-site.xml 
b/flink-connectors/flink-connector-hbase/src/test/resources/hbase-site.xml
new file mode 100644
index 0000000..1e58ef4
--- /dev/null
+++ b/flink-connectors/flink-connector-hbase/src/test/resources/hbase-site.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- Values used when running unit tests.  Specify any values in here that
+     should override the default values. -->
+
+<configuration>
+    <property>
+        <name>hbase_conf_key</name>
+        <value>hbase_conf_value!</value>
+    </property>
+</configuration>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 7d19352..eb78445 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -243,6 +243,17 @@ public class CoreOptions {
                .withDescription("Path to yarn configuration directory. It is 
required to run flink on YARN. You can also" +
                        " set it via environment variable.");
 
+       /**
+        * This options is here only for documentation generation, it is only
+        * evaluated in the shell scripts.
+        */
+       @SuppressWarnings("unused")
+       public static final ConfigOption<String> FLINK_HBASE_CONF_DIR = 
ConfigOptions
+               .key("env.hbase.conf.dir")
+               .noDefaultValue()
+               .withDescription("Path to hbase configuration directory. It is 
required to read HBASE configuration." +
+                       " You can also set it via environment variable.");
+
        // 
------------------------------------------------------------------------
        //  generic io
        // 
------------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index 01d6308..39095b5 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -114,6 +114,7 @@ DEFAULT_ENV_JAVA_OPTS_CLI=""                        # 
Optional JVM args (Client)
 DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters 
running in cluster mode
 DEFAULT_YARN_CONF_DIR=""                            # YARN Configuration 
Directory, if necessary
 DEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration 
Directory, if necessary
+DEFAULT_HBASE_CONF_DIR=""                           # HBase Configuration 
Directory, if necessary
 
 
########################################################################################################################
 # CONFIG KEYS: The default values can be overwritten by the following keys in 
conf/flink-conf.yaml
@@ -126,6 +127,7 @@ KEY_ENV_LOG_DIR="env.log.dir"
 KEY_ENV_LOG_MAX="env.log.max"
 KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir"
 KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir"
+KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir"
 KEY_ENV_JAVA_HOME="env.java.home"
 KEY_ENV_JAVA_OPTS="env.java.opts"
 KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
@@ -253,6 +255,10 @@ if [ -z "${HADOOP_CONF_DIR}" ]; then
     HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} 
"${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}")
 fi
 
+if [ -z "${HBASE_CONF_DIR}" ]; then
+    HBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} 
"${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}")
+fi
+
 if [ -z "${FLINK_PID_DIR}" ]; then
     FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" 
"${YAML_CONF}")
 fi
@@ -344,6 +350,24 @@ if [ -z "$HADOOP_CONF_DIR" ]; then
     fi
 fi
 
+# Check if deprecated HBASE_HOME is set, and specify config path to 
HBASE_CONF_DIR if it's empty.
+if [ -z "$HBASE_CONF_DIR" ]; then
+    if [ -n "$HBASE_HOME" ]; then
+        # HBASE_HOME is set.
+        if [ -d "$HBASE_HOME/conf" ]; then
+            HBASE_CONF_DIR="$HBASE_HOME/conf"
+        fi
+    fi
+fi
+
+# try and set HBASE_CONF_DIR to some common default if it's not set
+if [ -z "$HBASE_CONF_DIR" ]; then
+    if [ -d "/etc/hbase/conf" ]; then
+        echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR 
was set."
+        HBASE_CONF_DIR="/etc/hbase/conf"
+    fi
+fi
+
 
INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
 
 if [ -n "${HBASE_CONF_DIR}" ]; then

Reply via email to