wuchong commented on a change in pull request #12369: URL: https://github.com/apache/flink/pull/12369#discussion_r439917272
########## File path: flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/java/org/apache/flink/tests/util/hbase/SQLClientHBaseITCase.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.hbase; + +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.tests.util.cache.DownloadCache; +import org.apache.flink.tests.util.categories.PreCommit; +import org.apache.flink.tests.util.categories.TravisGroup1; +import org.apache.flink.tests.util.flink.ClusterController; +import org.apache.flink.tests.util.flink.FlinkResource; +import org.apache.flink.tests.util.flink.FlinkResourceSetup; +import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory; +import org.apache.flink.tests.util.flink.SQLJobSubmission; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.base.Charsets; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.List; + +import static org.hamcrest.Matchers.arrayContainingInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertThat; + +/** + * End-to-end test for the HBase connectors. + */ +@Category(value = {TravisGroup1.class, PreCommit.class}) +public class SQLClientHBaseITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(SQLClientHBaseITCase.class); + + private static final String HBASE_SOURCE_SINK_SCHEMA = "hbase_source_sink_schema.yaml"; + + @Rule + public final HBaseResource hbase; + + @Rule + public final FlinkResource flink = new LocalStandaloneFlinkResourceFactory() + .create(FlinkResourceSetup.builder().build()); + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @ClassRule + public static final DownloadCache DOWNLOAD_CACHE = DownloadCache.get(); + + private static final Path sqlToolBoxJar = TestUtils.getResourceJar(".*SqlToolbox.jar"); + private static final Path sqlConnectorHBaseJar = TestUtils.getResourceJar(".*SqlConnectorHbase.jar"); + private Path shadedHadoopJar; + private Path sqlClientSessionConf; + + public SQLClientHBaseITCase() { + this.hbase = HBaseResource.get(); + } + + @Before + public void before() throws Exception { + DOWNLOAD_CACHE.before(); + Path tmpPath = tmp.getRoot().toPath(); + LOG.info("The current temporary path: {}", tmpPath); + this.sqlClientSessionConf = tmpPath.resolve("sql-client-session.conf"); + + //download flink shaded hadoop jar for hbase e2e test using + shadedHadoopJar = DOWNLOAD_CACHE.getOrDownload( Review comment: +1 to remvoe Hadoop jar if we are running with Hadoop profile. ########## File path: flink-connectors/flink-sql-connector-hbase/src/main/resources/hbase-default.xml ########## @@ -0,0 +1,1558 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<!-- +OVERVIEW +The important configs. are listed near the top. You should change +at least the setting for hbase.tmp.dir. Other settings will change +dependent on whether you are running hbase in standalone mode or +distributed. See the hbase reference guide for requirements and +guidance making configuration. +This file does not contain all possible configurations. The file would be +much larger if it carried everything. The absent configurations will only be +found through source code reading. The idea is that such configurations are +exotic and only those who would go to the trouble of reading a particular +section in the code would be knowledgeable or invested enough in ever wanting +to alter such configurations, so we do not list them here. Listing all +possible configurations would overwhelm and obscure the important. +--> + +<configuration> Review comment: Why do we need this file at client side? ########## File path: flink-connectors/pom.xml ########## @@ -57,6 +57,7 @@ under the License. <module>flink-connector-gcp-pubsub</module> <module>flink-connector-kinesis</module> <module>flink-sql-connector-elasticsearch7</module> + <module>flink-sql-connector-hbase</module> Review comment: Move `<module>flink-sql-connector-hbase</module>` and `<module>flink-sql-connector-elasticsearch7</module>` into the following `sql-jars` profile. ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/test/resources/hbase_source_sink_schema.yaml ########## @@ -0,0 +1,55 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +tables: + - name: MyHBaseSource + type: source-table + schema: + - name: rowkey + type: STRING + - name: a + type: ROW<c1 STRING> + - name: b + type: ROW<c1 STRING> + connector: + type: "hbase" + version: "1.4.3" + table-name: "source_table" + zookeeper: + quorum: "localhost:2181" + znode.parent: "/hbase" + - name: MyHBaseSink + type: sink-table + schema: + - name: rowkey + type: STRING + - name: a + type: ROW<c1 STRING> + - name: b + type: ROW<c1 STRING> + connector: + type: "hbase" + version: "1.4.3" + table-name: "sink_table" + zookeeper: + quorum: "localhost:2181" + znode.parent: "/hbase" + write.buffer-flush: + max-size: "2mb" + max-rows: 1000 Review comment: We can use `max-rows: 1` to quickly flush the messages instead of waiting for 2 seconds. ########## File path: flink-end-to-end-tests/flink-end-to-end-tests-hbase/src/main/java/org/apache/flink/tests/util/hbase/LocalStandaloneHBaseResource.java ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.hbase; + +import org.apache.flink.tests.util.AutoClosableProcess; +import org.apache.flink.tests.util.CommandLineWrapper; +import org.apache.flink.tests.util.activation.OperatingSystemRestriction; +import org.apache.flink.tests.util.cache.DownloadCache; +import org.apache.flink.util.OperatingSystem; + +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * {@link HBaseResource} that downloads hbase and set up a local hbase cluster. + */ +public class LocalStandaloneHBaseResource implements HBaseResource { + + private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneHBaseResource.class); + + private final TemporaryFolder tmp = new TemporaryFolder(); + + private final DownloadCache downloadCache = DownloadCache.get(); + private Path hbaseDir; + + LocalStandaloneHBaseResource() { + OperatingSystemRestriction.forbid( + String.format("The %s relies on UNIX utils and shell scripts.", getClass().getSimpleName()), + OperatingSystem.WINDOWS); + } + + private static String getHBaseDownloadUrl() { + return "https://archive.apache.org/dist/hbase/1.4.3/hbase-1.4.3-bin.tar.gz"; + } + + @Override + public void before() throws Exception { + tmp.create(); + downloadCache.before(); + + this.hbaseDir = tmp.newFolder("hbase").toPath().toAbsolutePath(); + setupHBaseDist(); + setupHBaseCluster(); + } + + private void setupHBaseDist() throws IOException { + final Path downloadDirectory = tmp.newFolder("getOrDownload").toPath(); + final Path hbaseArchive = downloadCache.getOrDownload(getHBaseDownloadUrl(), downloadDirectory); + + LOG.info("HBase localtion: {}", hbaseDir.toAbsolutePath()); + AutoClosableProcess.runBlocking(CommandLineWrapper + .tar(hbaseArchive) + .extract() + .zipped() + .strip(1) + .targetDir(hbaseDir) + .build()); + + LOG.info("Configure {} as hbase.tmp.dir", hbaseDir.toAbsolutePath()); + final String tmpDirConfig = "<configuration><property><name>hbase.tmp.dir</name><value>" + hbaseDir + "</value></property></configuration>"; + Files.write(hbaseDir.resolve(Paths.get("conf", "hbase-site.xml")), tmpDirConfig.getBytes()); + } + + private void setupHBaseCluster() throws IOException { + LOG.info("Starting HBase cluster"); + AutoClosableProcess.runBlocking( + hbaseDir.resolve(Paths.get("bin", "start-hbase.sh")).toString()); + + while (!isHBaseRunning()) { + try { + LOG.info("Waiting for HBase to start"); + Thread.sleep(500L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + + @Override + public void afterTestSuccess() { + try { + LOG.info("Stopping HBase Cluster"); + AutoClosableProcess.runBlocking( + hbaseDir.resolve(Paths.get("bin", "hbase-daemon.sh")).toString(), + "stop", + "master"); + + while (isHBaseRunning()) { + try { + LOG.info("Waiting for HBase to stop"); + Thread.sleep(500L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + + } catch (IOException ioe) { + LOG.warn("Error while shutting down hbase.", ioe); + } + downloadCache.afterTestSuccess(); + tmp.delete(); + } + + private static boolean isHBaseRunning() { + try { + final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false); + queryHMasterStatus(line -> atomicHMasterStarted.compareAndSet(false, line.contains("HMaster"))); + return atomicHMasterStarted.get(); + } catch (IOException ioe) { + return false; + } + } + + private static void queryHMasterStatus(final Consumer<String> stdoutProcessor) throws IOException { + AutoClosableProcess + .create("jps") Review comment: ??? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
