Author: bmahe
Date: Mon Dec 5 21:32:43 2011
New Revision: 1210649
URL: http://svn.apache.org/viewvc?rev=1210649&view=rev
Log:
BIGTOP-287. Integrating test for HBASE-4570 (contributed by Stephen Chu)
Added:
incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Putter.java
incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Scanner.java
incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/TestConcurrentScanAndPut.java
Added:
incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Putter.java
URL:
http://svn.apache.org/viewvc/incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Putter.java?rev=1210649&view=auto
==============================================================================
---
incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Putter.java
(added)
+++
incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Putter.java
Mon Dec 5 21:32:43 2011
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.system;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This program scans a table for rows with a specified column
+ * ("f1:qual") value, and updates the column with the same value.
+ */
+public class Putter {
+ public static Put convert(Result result, int versions) {
+ Put put = null;
+ if (result != null) {
+ NavigableMap<byte[], NavigableMap<byte[], byte[]>> cfmap =
+ result.getNoVersionMap();
+
+ if (result.getRow() != null && cfmap != null) {
+ put = new Put(result.getRow());
+ for (byte[] family : cfmap.keySet()) {
+ NavigableMap<byte[], byte[]> qualifierMap = cfmap.get(family);
+
+ if (qualifierMap != null) {
+ for (Map.Entry<byte[], byte[]> e : qualifierMap.entrySet()) {
+ byte[] qual = e.getKey();
+ byte[] value = e.getValue();
+
+ if (value != null && value.length > 0) {
+ put.add(family, qual, value);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return put;
+ }
+
+ public static int doScanAndPut(HTable table, int val) throws IOException {
+ return doScanAndPut(table, val, true);
+ }
+
+ public static int doScanAndPut(HTable table, int val, boolean autoflush)
+ throws IOException {
+ Scan s = new Scan();
+ byte[] start = {};
+ byte[] stop = {};
+ byte[] value = Bytes.toBytes(String.format("%010d", val));
+ s.setStartRow(start);
+ s.setStopRow(stop);
+ SingleColumnValueFilter filter = new SingleColumnValueFilter(
+ Bytes.toBytes("f1"), Bytes.toBytes("qual"), CompareOp.EQUAL, value);
+ s.setFilter(filter);
+
+ table.setAutoFlush(autoflush);
+ ResultScanner sc = table.getScanner(s);
+ int cnt = 0;
+ for (Result r : sc) {
+ Put p = convert(r, 0);
+ table.put(p);
+ cnt++;
+ }
+ return cnt;
+ }
+
+ public static void main(String argv[]) throws IOException {
+ if (argv.length < 2) {
+ System.err.println("usage: " + Putter.class.getSimpleName() +
+ " <table> <value>");
+ System.err.println(" <value>: a numeric value [0,500)");
+ System.exit(1);
+ }
+
+ boolean autoflush = true;
+ int loops = 1;
+ for (int i = 1; i < argv.length; i++) {
+ if (argv[i].equals("-f")) {
+ autoflush = false;
+ } else if (argv[i].equals("-l")) {
+ i++;
+ loops = Integer.parseInt(argv[i]);
+ }
+ }
+
+ String tableName = argv[0];
+ int val = Integer.parseInt(argv[1]);
+ HTable table = new HTable(tableName);
+ for (int i = 0; i < loops; i++) {
+ try {
+ doScanAndPut(table, val, autoflush);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
Added:
incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Scanner.java
URL:
http://svn.apache.org/viewvc/incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Scanner.java?rev=1210649&view=auto
==============================================================================
---
incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Scanner.java
(added)
+++
incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/Scanner.java
Mon Dec 5 21:32:43 2011
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.system;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * This program scans a table a configurable number of times. Uses
+ * the table record reader.
+ */
+public class Scanner {
+ public static final Log LOG = LogFactory.getLog(Scanner.class);
+
+ public static int doScan(HTable table, int val) throws IOException,
+ InterruptedException {
+ Scan s = new Scan();
+ byte[] start = {};
+ byte[] stop = {};
+ byte[] value = Bytes.toBytes(String.format("%010d", val));
+ s.setStartRow(start);
+ s.setStopRow(stop);
+ SingleColumnValueFilter filter = new SingleColumnValueFilter(
+ Bytes.toBytes("f1"), Bytes.toBytes("qual"), CompareOp.EQUAL, value);
+ s.setFilter(filter);
+
+ // Keep track of gathered elements.
+ Multimap<String, String> mm = ArrayListMultimap.create();
+
+ // Counts
+ int cnt = 0;
+ long i = 0;
+ ResultScanner rs = table.getScanner(s);
+ for (Result r : rs) {
+ if (r.getRow() == null) {
+ continue;
+ }
+
+ NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long,
+ byte[]>>> columnFamilyMap = r.getMap();
+
+ // Output time to show if flush related.
+ String k = Bytes.toStringBinary(r.getRow());
+ if (mm.get(k).size() >= 1) {
+ System.out.println("Duplicate rowkey " + k);
+ LOG.error("Duplicate rowkey " + k);
+ }
+
+ mm.put(Bytes.toStringBinary(r.getRow()), i + ": " + r);
+ cnt++;
+ i++;
+ }
+
+ System.out.println("scan items counted: " + cnt + " for scan " +
+ s.toString() + " with filter f1:qual == " + Bytes.toString(value));
+
+ // Print out dupes.
+ int dupes = 0;
+ for (Entry<String, Collection<String>> e : mm.asMap().entrySet()) {
+ if (e.getValue().size() > 1) {
+ dupes++;
+ System.out.print("Row " + e.getKey() + " had time stamps: ");
+ String[] tss = e.getValue().toArray(new String[0]);
+ System.out.println(Arrays.toString(tss));
+ }
+ }
+
+ return dupes;
+ }
+
+ public static void main(String argv[]) throws IOException {
+ if (argv.length < 2) {
+ System.err.println("usage: " + Scanner.class.getSimpleName() +
+ " <table> <value>");
+ System.err.println(" <value>: a numeric value [0,500)");
+ System.exit(1);
+ }
+
+ String tableName = argv[0];
+ int val = Integer.parseInt(argv[1]);
+ int loops = 1;
+ for (int i = 1; i < argv.length; i++) {
+ if (argv[i].equals("-l")) {
+ i++;
+ loops = Integer.parseInt(argv[i]);
+ }
+ }
+
+ HTable table = new HTable(tableName);
+ int exitVal = 0;
+ for (int i = 0; i < loops; i++) {
+ try {
+ exitVal = doScan(table, val);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ if (exitVal != 0) {
+ break;
+ }
+ }
+ System.exit(exitVal);
+ }
+}
Added:
incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/TestConcurrentScanAndPut.java
URL:
http://svn.apache.org/viewvc/incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/TestConcurrentScanAndPut.java?rev=1210649&view=auto
==============================================================================
---
incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/TestConcurrentScanAndPut.java
(added)
+++
incubator/bigtop/trunk/bigtop-tests/test-artifacts/hbase/src/main/groovy/org/apache/bigtop/itest/hbase/system/TestConcurrentScanAndPut.java
Mon Dec 5 21:32:43 2011
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.itest.hbase.system;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.bigtop.itest.JarContent;
+import org.apache.bigtop.itest.shell.Shell;
+
+import org.apache.bigtop.itest.hbase.util.HBaseTestUtil;
+
+/**
+ * This program tests concurrent scans and writes. In HBASE-4570,
+ * when scanning a table during concurrent writes, rows that have
+ * multiple column families sometimes get split into two rows.
+ */
+public class TestConcurrentScanAndPut {
+ public static Shell scanSh = new Shell( "/bin/bash -s" );
+ public static Shell putSh = new Shell( "/bin/bash -s" );
+
+ public static HBaseAdmin admin;
+ public static String tableName;
+ public static String putter_pid;
+
+ public static int scannerLoops;
+ public static int putterLoops;
+
+ @BeforeClass
+ public static void setUp() throws ClassNotFoundException,
+ InterruptedException, IOException {
+ System.out.println("Unpacking resources");
+ JarContent.unpackJarContainer(Scanner.class, "." , null);
+ JarContent.unpackJarContainer(Putter.class, "." , null);
+
+ Configuration conf = HBaseConfiguration.create();
+ try {
+ HBaseAdmin.checkHBaseAvailable(conf);
+ } catch (Exception e) {
+ System.err.println("Hbase is not up. Bailing out.");
+ System.exit(1);
+ }
+
+ tableName =
+ new String(HBaseTestUtil.getTestTableName("concurrentScanAndPut"));
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ for (int i = 0; i < 10; i++) {
+ htd.addFamily(new HColumnDescriptor("f" + i));
+ }
+ admin = new HBaseAdmin(conf);
+ admin.createTable(htd);
+
+ HTable table = new HTable(tableName);
+ ArrayList<Put> puts = new ArrayList<Put>(1000);
+
+ Random rnd = new Random();
+ int size = 25000;
+ int batch = 2000;
+
+ System.out.println("Creating table with 10 column families and 25k rows");
+ for (int i = 0; i < size; i++) {
+ String r = String.format("row%010d", i);
+ Put p = new Put(Bytes.toBytes(r));
+ for (int j = 0; j < 10; j++) {
+ String value = String.format("%010d", rnd.nextInt(500));
+ p.add(Bytes.toBytes("f" + j),
+ Bytes.toBytes("qual"),
+ Bytes.toBytes(value));
+ String bigvalue = String.format("%0100d%0100d%0100d%0100d%0100d" +
+ "%0100d%0100d%0100d%0100d%0100d",
+ i, i, i, i, i, i, i, i, i, i);
+ p.add(Bytes.toBytes("f" + j),
+ Bytes.toBytes("data"),
+ Bytes.toBytes(bigvalue));
+ }
+ puts.add(p);
+ if (i % batch == (batch - 1)) {
+ table.put(puts);
+ puts.clear();
+ System.out.println("put " + i);
+ }
+ }
+ table.put(puts);
+ table.flushCommits();
+ table.close();
+
+ try {
+ scannerLoops = Integer.parseInt(System.getProperty(
+ "concurrentScanAndPut.scanner.loops"));
+ } catch (NumberFormatException e) {
+ scannerLoops = 100;
+ }
+
+ try {
+ putterLoops = Integer.parseInt(System.getProperty(
+ "concurrentScanAndPut.putter.loops"));
+ } catch (NumberFormatException e) {
+ putterLoops = 100;
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ System.out.println("Killing putter process");
+ putSh.exec("kill -9 " + putter_pid);
+
+ System.out.println("Removing test table " + tableName);
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ }
+
+ @Test
+ public void testConcurrentScanAndPut() {
+ System.out.println("Starting puts to test table " + tableName);
+ putSh.exec("(HBASE_CLASSPATH=. " +
+ "hbase com.cloudera.itest.hbase.system.Putter " +
+ tableName + " 13 -l " + putterLoops +
+ " > /dev/null 2>&1 & echo $! ) 2> /dev/null");
+ putter_pid = putSh.getOut().get(0);
+
+ System.out.println("Starting concurrent scans of test table " +
+ tableName);
+ scanSh.exec("HBASE_CLASSPATH=. hbase " +
+ "com.cloudera.itest.hbase.system.Scanner " +
+ tableName + " 13 -l " + scannerLoops + " 2>/dev/null");
+
+ int splitRows = scanSh.getRet();
+ System.out.println("Split rows: " + splitRows);
+ assertTrue("Rows were split when scanning table with concurrent writes",
+ splitRows == 0);
+ }
+}