carp84 commented on a change in pull request #9275: [FLINK-13290][table-planner-blink][hbase] SinkCodeGenerator should not compare row type field names and enable blink planner for hbase IT case URL: https://github.com/apache/flink/pull/9275#discussion_r309554436
########## File path: flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/util/HBaseTestBase.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.util; + +import org.apache.flink.addons.hbase.HBaseConnectorITCase2; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironmentFactory; +import org.apache.flink.api.java.LocalEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.addons.hbase.util.PlannerType.BLINK_PLANNER; +import static org.apache.flink.addons.hbase.util.PlannerType.OLD_PLANNER; + +@RunWith(Parameterized.class) +public abstract class HBaseTestBase extends HBaseTestingClusterAutostarter { + + protected static final String TEST_TABLE_1 = "testTable1"; + protected static final String TEST_TABLE_2 = "testTable2"; + + protected static final String ROWKEY = "rk"; + protected static final String FAMILY1 = "family1"; + protected static final String F1COL1 = "col1"; + + protected static final String FAMILY2 = "family2"; + protected static final String F2COL1 = "col1"; + protected static final String F2COL2 = "col2"; + + protected static final String FAMILY3 = "family3"; + protected static final String F3COL1 = "col1"; + protected static final String F3COL2 = "col2"; + protected static final String F3COL3 = "col3"; + + @Parameterized.Parameter + public PlannerType planner; + protected EnvironmentSettings streamSettings; + protected EnvironmentSettings batchSettings; + + @Parameterized.Parameters(name = "planner = {0}") + public static PlannerType[] getPlanner() { + return new PlannerType[]{BLINK_PLANNER, OLD_PLANNER}; + } + + @BeforeClass + public static void activateHBaseCluster() throws IOException { + registerHBaseMiniClusterInClasspath(); + prepareTables(); + } + + @Before + public void before() { + EnvironmentSettings.Builder streamBuilder = EnvironmentSettings.newInstance().inStreamingMode(); + EnvironmentSettings.Builder batchBuilder = EnvironmentSettings.newInstance().inBatchMode(); + if (BLINK_PLANNER.equals(planner)) { + this.streamSettings = streamBuilder.useBlinkPlanner().build(); + this.batchSettings = batchBuilder.useBlinkPlanner().build(); + } else if (OLD_PLANNER.equals(planner)) { + this.streamSettings = streamBuilder.useOldPlanner().build(); + this.batchSettings = batchBuilder.useOldPlanner().build(); + } else { + throw new IllegalArgumentException("Unsupported planner name " + planner); + } + } + + private static void prepareTables() throws IOException { + createTable1(); + createTable2(); + } + + private static void createTable1() throws IOException { + // create a table + TableName tableName = TableName.valueOf(TEST_TABLE_1); + // column families + byte[][] families = new byte[][]{ + Bytes.toBytes(FAMILY1), + Bytes.toBytes(FAMILY2), + Bytes.toBytes(FAMILY3) + }; + // split keys + byte[][] splitKeys = new byte[][]{ Bytes.toBytes(4) }; + createTable(tableName, families, splitKeys); Review comment: Minor: we could extract L101-L111 out to remove some duplicated codes. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
