Repository: incubator-trafodion Updated Branches: refs/heads/release1.3 6ae976fdc -> 7d3124fa7
support vanilla hbase 0.98.10 Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/cd5fd033 Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/cd5fd033 Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/cd5fd033 Branch: refs/heads/release1.3 Commit: cd5fd033ee10694e3d76602099eef9a7581865a5 Parents: 6ae976f Author: mashengchen <[email protected]> Authored: Thu Jan 21 05:01:29 2016 +0000 Committer: mashengchen <[email protected]> Committed: Thu Jan 21 05:01:29 2016 +0000 ---------------------------------------------------------------------- core/sqf/sqenvcom.sh | 2 +- core/sqf/sql/scripts/install_apache_hadoop | 4 +- core/sqf/src/seatrans/hbase-trx/Makefile | 2 +- core/sqf/src/seatrans/hbase-trx/pom.xml.apache | 4 +- .../transactional/SsccTransactionalTable.java | 37 +- .../transactional/TransactionalTable.java | 43 +-- .../SingleVersionDeleteNotSupported.java | 9 +- .../transactional/TrxTransactionState.java | 367 +++++++++++-------- 8 files changed, 266 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/cd5fd033/core/sqf/sqenvcom.sh ---------------------------------------------------------------------- diff --git a/core/sqf/sqenvcom.sh b/core/sqf/sqenvcom.sh index e367326..96f3698 100644 --- a/core/sqf/sqenvcom.sh +++ b/core/sqf/sqenvcom.sh @@ -603,7 +603,7 @@ EOF export HIVE_JAR_DIRS="$APACHE_HIVE_HOME/lib" - export HBASE_TRX_JAR=hbase-trx-hbase_98_4-${TRAFODION_VER}.jar + export HBASE_TRX_JAR=hbase-trx-hbase_98_10-${TRAFODION_VER}.jar # end of code for Apache Hadoop/HBase installation w/o distro else http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/cd5fd033/core/sqf/sql/scripts/install_apache_hadoop ---------------------------------------------------------------------- diff --git a/core/sqf/sql/scripts/install_apache_hadoop b/core/sqf/sql/scripts/install_apache_hadoop index 8d766ea..84b317b 100755 --- a/core/sqf/sql/scripts/install_apache_hadoop +++ b/core/sqf/sql/scripts/install_apache_hadoop @@ -554,10 +554,10 @@ HIVE_TAR=${HIVE_PREFIX}.tar.gz #HBASE_MIRROR_URL=http://psg.mtu.edu/pub/apache/hbase/hbase-0.98.3 #HBASE_MIRROR_URL=http://archive.cloudera.com/cdh5/cdh/5 -HBASE_MIRROR_URL=http://archive.apache.org/dist/hbase/hbase-0.98.6/ +HBASE_MIRROR_URL=http://archive.apache.org/dist/hbase/hbase-0.98.10/ #HBASE_TAR=hbase-0.98.6-cdh5.3.0.tar.gz -HBASE_TAR=hbase-0.98.6-hadoop2-bin.tar.gz +HBASE_TAR=hbase-0.98.10-hadoop2-bin.tar.gz #HBASE_TAR=hbase-0.98.4-hadoop2-bin.tar.gz if [[ "$SQ_HBASE_DISTRO" = "HDP" ]]; then HBASE_TAR=hbase-0.98.4.2.2.0.0-2041-hadoop2.tar.gz http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/cd5fd033/core/sqf/src/seatrans/hbase-trx/Makefile ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/Makefile b/core/sqf/src/seatrans/hbase-trx/Makefile index 5cab6dd..e5e25f9 100644 --- a/core/sqf/src/seatrans/hbase-trx/Makefile +++ b/core/sqf/src/seatrans/hbase-trx/Makefile @@ -21,7 +21,7 @@ # This Makefile is just a thin shell to Maven, which is used to do the real build -BLD_HBASE_APACHE_TRX_JARNAME =hbase-trx-hbase_98_4-$(TRAFODION_VER).jar +BLD_HBASE_APACHE_TRX_JARNAME =hbase-trx-hbase_98_10-$(TRAFODION_VER).jar BLD_HBASE_CDH_TRX_JARNAME =hbase-trx-cdh5_3-$(TRAFODION_VER).jar BLD_HBASE_MAPR_TRX_JARNAME =hbase-trx-mapr4_0-$(TRAFODION_VER).jar BLD_HBASE_HDP_TRX_JARNAME =hbase-trx-hdp2_2-$(TRAFODION_VER).jar http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/cd5fd033/core/sqf/src/seatrans/hbase-trx/pom.xml.apache ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/pom.xml.apache b/core/sqf/src/seatrans/hbase-trx/pom.xml.apache index c717479..f936c1b 100755 --- a/core/sqf/src/seatrans/hbase-trx/pom.xml.apache +++ b/core/sqf/src/seatrans/hbase-trx/pom.xml.apache @@ -58,7 +58,7 @@ <properties> <hadoop.version>2.4.0</hadoop.version> - <hbase.version>0.98.4-hadoop2</hbase.version> + <hbase.version>0.98.10-hadoop2</hbase.version> <!--<hbase.version>0.98.0.2.1.4.0-632-hadoop2</hbase.version>--> <!--<hbase.version>0.98.3-hadoop2</hbase.version>--> <!--<hbase.version>0.98.3-hadoop1</hbase.version>--> @@ -70,7 +70,7 @@ <groupId>org.apache</groupId> <modelVersion>4.0.0</modelVersion> - <artifactId>hbase-trx-hbase_98_4</artifactId> + <artifactId>hbase-trx-hbase_98_10</artifactId> <version>${env.TRAFODION_VER}</version> <name>HBase - Trx</name> <description>Trx of HBase usage</description> http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/cd5fd033/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java index 7bc51cb..559e9e3 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java @@ -41,9 +41,11 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; @@ -671,25 +673,24 @@ if (LOG.isTraceEnabled()) LOG.trace("checkAndPut, seting request startid: " + tr } } + + // validate for well-formedness + public void validatePut(final Put put) throws IllegalArgumentException { + if (put.isEmpty()) { + throw new IllegalArgumentException("No columns to insert"); + } + if (maxKeyValueSize > 0) { + for (List<Cell> list : put.getFamilyCellMap().values()) { + for (Cell c : list) { + if (KeyValueUtil.length(c) > maxKeyValueSize) { + throw new IllegalArgumentException("KeyValue size too large"); + } + } + } + } + } - // validate for well-formedness - public void validatePut(final Put put) throws IllegalArgumentException { - if (put.isEmpty()) { - throw new IllegalArgumentException("No columns to insert"); - } - if (maxKeyValueSize > 0) { - for (List<KeyValue> list : put.getFamilyMap().values()) { - for (KeyValue kv : list) { - if (kv.getLength() > maxKeyValueSize) { - throw new IllegalArgumentException( - "KeyValue size too large"); - } - } - } - } - } - public HRegionLocation getRegionLocation(byte[] row, boolean f) - throws IOException { + public HRegionLocation getRegionLocation(byte[] row, boolean f) throws IOException { return super.getRegionLocation(row, f); } public void close() throws IOException { super.close(); } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/cd5fd033/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java index 20450d1..0e0cd9e 100755 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java @@ -47,9 +47,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -744,27 +746,26 @@ public class TransactionalTable extends HTable implements TransactionalTableClie throw new IOException(result.getException()); } } - - // validate for well-formedness - public void validatePut(final Put put) throws IllegalArgumentException { - if (put.isEmpty()) { - throw new IllegalArgumentException("No columns to insert"); - } - if (maxKeyValueSize > 0) { - for (List<KeyValue> list : put.getFamilyMap().values()) { - for (KeyValue kv : list) { - if (kv.getLength() > maxKeyValueSize) { - throw new IllegalArgumentException( - "KeyValue size too large"); - } - } - } - } - } - - private int maxKeyValueSize; -public HRegionLocation getRegionLocation(byte[] row, boolean f) - throws IOException { + + // validate for well-formedness + public void validatePut(final Put put) throws IllegalArgumentException { + if (put.isEmpty()) { + throw new IllegalArgumentException("No columns to insert"); + } + if (maxKeyValueSize > 0) { + for (List<Cell> list : put.getFamilyCellMap().values()) { + for (Cell c : list) { + if (KeyValueUtil.length(c) > maxKeyValueSize) { + throw new IllegalArgumentException("KeyValue size too large"); + } + } + } + } + } + + private int maxKeyValueSize; + + public HRegionLocation getRegionLocation(byte[] row, boolean f) throws IOException { return super.getRegionLocation(row,f); } public void close() throws IOException { super.close(); } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/cd5fd033/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SingleVersionDeleteNotSupported.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SingleVersionDeleteNotSupported.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SingleVersionDeleteNotSupported.java index 7e7c1c1..976803e 100755 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SingleVersionDeleteNotSupported.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SingleVersionDeleteNotSupported.java @@ -26,6 +26,7 @@ package org.apache.hadoop.hbase.regionserver.transactional; import java.util.Collection; import java.util.List; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; @@ -53,10 +54,10 @@ public class SingleVersionDeleteNotSupported extends DoNotRetryIOException { * mechansim will currently treat DeleteColumn the same as Delete which could cause confusion. */ public static void validateDelete(final Delete delete) throws SingleVersionDeleteNotSupported { - Collection<List<KeyValue>> values = delete.getFamilyMap().values(); - for (List<KeyValue> value : values) { - for (KeyValue kv : value) { - if (Type.Delete.getCode() == kv.getType()) { + Collection<List<Cell>> values = delete.getFamilyCellMap().values(); + for (List<Cell> value : values) { + for (Cell cell : value) { + if (cell.getTypeByte() == Type.Delete.getCode()) { throw new SingleVersionDeleteNotSupported(); } } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/cd5fd033/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java index 79816d7..4e22cd9 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java @@ -24,20 +24,15 @@ package org.apache.hadoop.hbase.regionserver.transactional; import java.io.IOException; - -import java.lang.Class; - import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; -import java.util.ListIterator; import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableSet; @@ -45,31 +40,27 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.codec.binary.Hex; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.ScanType; -import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.io.DataInputBuffer; /** @@ -78,53 +69,96 @@ import org.apache.hadoop.io.DataInputBuffer; */ public class TrxTransactionState extends TransactionState{ - static boolean sb_sqm_98_1; - static boolean sb_sqm_98_4; + static boolean sb_sqm_98_1 = false; + static boolean sb_sqm_98_4 = false; + static boolean sb_sqm_98_9 = false; + static java.lang.reflect.Constructor c98_1 = null; static java.lang.reflect.Constructor c98_4 = null; + static java.lang.reflect.Constructor c98_9 = null; + + static Class keepDeletedCellsClazz = null; + static Class scaninfoClazz = null; + static Constructor scaninfoConstructor = null; + static Object[] scaninfoArgs = null; static { - sb_sqm_98_1 = true; - try { - NavigableSet<byte[]> lv_nvg = (NavigableSet<byte[]>) null; - c98_1 = ScanQueryMatcher.class.getConstructor( - new Class [] { - Scan.class, - ScanInfo.class, - java.util.NavigableSet.class, - ScanType.class, - long.class, - long.class, - long.class - }); - } - catch (NoSuchMethodException exc_nsm) { - sb_sqm_98_1 = false; - sb_sqm_98_4 = true; - try { - c98_4 = ScanQueryMatcher.class.getConstructor( - new Class [] { - Scan.class, - ScanInfo.class, - java.util.NavigableSet.class, - ScanType.class, - long.class, - long.class, - long.class, - RegionCoprocessorHost.class - }); - } - catch (NoSuchMethodException exc_nsm2) { - sb_sqm_98_4 = false; - } - } + String version = VersionInfo.getVersion();// the hbase version string, eg. "0.6.3-dev" + LOG.info("Got info of Class ScanQueryMatcher for HBase version :" + version); + + try { + c98_1 = ScanQueryMatcher.class.getConstructor(new Class[] { Scan.class, + ScanInfo.class, + java.util.NavigableSet.class, + ScanType.class, + long.class, + long.class, + long.class }); + LOG.info("Got info of Class ScanQueryMatcher for HBase 98.1"); + sb_sqm_98_1 = true; + } catch (NoSuchMethodException e) { + try { + c98_4 = ScanQueryMatcher.class.getConstructor(new Class[] { Scan.class, + ScanInfo.class, + java.util.NavigableSet.class, + ScanType.class, + long.class, + long.class, + long.class, + RegionCoprocessorHost.class }); + LOG.info("Got info of Class ScanQueryMatcher for HBase 98.4"); + sb_sqm_98_4 = true; + } catch (NoSuchMethodException e1) { + try { + c98_9 = ScanQueryMatcher.class.getConstructor(new Class[] { Scan.class, + ScanInfo.class, + java.util.NavigableSet.class, + ScanType.class, + long.class, + long.class, + long.class, + long.class, + RegionCoprocessorHost.class }); + LOG.info("Got info of Class ScanQueryMatcher for HBase 98.9"); + sb_sqm_98_9 = true; + } catch (NoSuchMethodException e2) { + throw new RuntimeException("HBase version :" + version + ". No matcher ScanQueryMatcher."); + } + } + } + + + try { + scaninfoClazz = Class.forName("org.apache.hadoop.hbase.regionserver.ScanInfo"); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e.getMessage()); + } + Class[] types = null; - if (sb_sqm_98_1) { - LOG.info("Got info of Class ScanQueryMatcher for HBase 98.1"); - } - if (sb_sqm_98_4) { - LOG.info("Got info of Class ScanQueryMatcher for HBase 98.4"); - } + try { + types = new Class[] { byte[].class, int.class, int.class, long.class, boolean.class, long.class, KVComparator.class }; + scaninfoConstructor = scaninfoClazz.getConstructor(types); + scaninfoArgs = new Object[] { null, 0, 1, HConstants.FOREVER, false, 0, KeyValue.COMPARATOR }; + if (LOG.isTraceEnabled()) + LOG.trace("Created ScanInfo instance before HBase 98.8"); + } catch (Exception e) { + + try { + keepDeletedCellsClazz = Class.forName("org.apache.hadoop.hbase.KeepDeletedCells"); + } catch (ClassNotFoundException e1) { + throw new RuntimeException(e1.getMessage()); + } + types = new Class[] { byte[].class, int.class, int.class, long.class, keepDeletedCellsClazz, long.class, KVComparator.class }; + try { + scaninfoConstructor = scaninfoClazz.getConstructor(types); + scaninfoArgs = new Object[] { null, 0, 1, HConstants.FOREVER, Enum.valueOf(keepDeletedCellsClazz, "FALSE"), 0, KeyValue.COMPARATOR }; + if (LOG.isTraceEnabled()) + LOG.trace("Created ScanInfo instance after HBase 98.8"); + } catch (Exception e1) { + LOG.error("Created ScanInfo instance ERROR"); + throw new RuntimeException(e1.getMessage()); + } + } } /** @@ -199,14 +233,14 @@ public class TrxTransactionState extends TransactionState{ e.add(kv); } try { - long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(), - e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor, - this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE); - //if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y11 write edit to HLOG during put with txid " + txid + " ts flush id " + this.flushTxId); - if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1 + long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(), + e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor, + this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE); + //if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y11 write edit to HLOG during put with txid " + txid + " ts flush id " + this.flushTxId); + if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1 } catch (IOException exp1) { - LOG.info("TrxRegionEndpoint coprocessor addWrite writing to HLOG for early logging: Threw an exception"); + LOG.info("TrxRegionEndpoint coprocessor addWrite writing to HLOG for early logging: Threw an exception"); //throw exp1; } } @@ -222,7 +256,11 @@ public class TrxTransactionState extends TransactionState{ public boolean hasWrite() { return writeOrdering.size() > 0; } - + + public int writeSize() { + return writeOrdering.size(); + } + public synchronized void addDelete(final Delete delete) { if (LOG.isTraceEnabled()) LOG.trace("addDelete -- ENTRY: delete: " + delete.toString()); @@ -245,14 +283,13 @@ public class TrxTransactionState extends TransactionState{ e.add(kv); } try { - long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(), + long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(), e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor, this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE); //if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y00 write edit to HLOG during delete with txid " + txid + " ts flush id " + this.flushTxId); - if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1 - } - catch (IOException exp1) { - LOG.info("TrxRegionEndpoint coprocessor addDelete writing to HLOG for early logging: Threw an exception"); + if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1 + } catch (IOException exp1) { + LOG.info("TrxRegionEndpoint coprocessor addDelete writing to HLOG for early logging: Threw an exception"); } } else { @@ -649,61 +686,87 @@ public class TrxTransactionState extends TransactionState{ super.setSequenceID(Long.MAX_VALUE); //Store.ScanInfo scaninfo = new Store.ScanInfo(null, 0, 1, HConstants.FOREVER, false, 0, Cell.COMPARATOR); - ScanInfo scaninfo = new ScanInfo(null, 0, 1, HConstants.FOREVER, false, 0, KeyValue.COMPARATOR); + //after hbase 0.98.8, ScanInfo instance need KeepDeletedCells as param instead of boolean + ScanInfo scaninfo = null; + try { + scaninfo = (ScanInfo) scaninfoConstructor.newInstance(scaninfoArgs); + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } try { - if (sb_sqm_98_1) { - try { - matcher = (ScanQueryMatcher) c98_1.newInstance(scan, - scaninfo, - null, - ScanType.USER_SCAN, - Long.MAX_VALUE, - HConstants.LATEST_TIMESTAMP, - 0); - if (LOG.isTraceEnabled()) LOG.trace("Created matcher using reflection for HBase 98.1"); - } - catch (InstantiationException exc_ins) { - LOG.error("InstantiationException: " + exc_ins); - } - catch (IllegalAccessException exc_ill_acc) { - LOG.error("IllegalAccessException: " + exc_ill_acc); - } - catch (InvocationTargetException exc_inv_tgt) { - LOG.error("InvocationTargetException: " + exc_inv_tgt); - } - - } - else { - try { - matcher = (ScanQueryMatcher) c98_4.newInstance(scan, - scaninfo, - null, - ScanType.USER_SCAN, - Long.MAX_VALUE, - HConstants.LATEST_TIMESTAMP, - (long) 0, - null); - if (LOG.isTraceEnabled()) LOG.trace("Created matcher using reflection for HBase 98.4"); - } - catch (InstantiationException exc_ins) { - LOG.error("InstantiationException: " + exc_ins); - } - catch (IllegalAccessException exc_ill_acc) { - LOG.error("IllegalAccessException: " + exc_ill_acc); - } - catch (InvocationTargetException exc_inv_tgt) { - LOG.error("InvocationTargetException: " + exc_inv_tgt); - } - - } + if (sb_sqm_98_1) { + try { + matcher = (ScanQueryMatcher) c98_1.newInstance(scan, + scaninfo, + null, + ScanType.USER_SCAN, + Long.MAX_VALUE, + HConstants.LATEST_TIMESTAMP, + 0); + if (LOG.isTraceEnabled()) LOG.trace("Created matcher using reflection for HBase 98.1"); + } + catch (InstantiationException exc_ins) { + LOG.error("InstantiationException: " + exc_ins); + } + catch (IllegalAccessException exc_ill_acc) { + LOG.error("IllegalAccessException: " + exc_ill_acc); + } + catch (InvocationTargetException exc_inv_tgt) { + LOG.error("InvocationTargetException: " + exc_inv_tgt); + } + + } + else if(sb_sqm_98_4) { + try { + matcher = (ScanQueryMatcher) c98_4.newInstance(scan, + scaninfo, + null, + ScanType.USER_SCAN, + Long.MAX_VALUE, + HConstants.LATEST_TIMESTAMP, + (long) 0, + null); + if (LOG.isTraceEnabled()) LOG.trace("Created matcher using reflection for HBase 98.4"); + } + catch (InstantiationException exc_ins) { + LOG.error("InstantiationException: " + exc_ins); + } + catch (IllegalAccessException exc_ill_acc) { + LOG.error("IllegalAccessException: " + exc_ill_acc); + } + catch (InvocationTargetException exc_inv_tgt) { + LOG.error("InvocationTargetException: " + exc_inv_tgt); + } + } + else { + try { + matcher = (ScanQueryMatcher) c98_9.newInstance(scan, + scaninfo, + null, + ScanType.USER_SCAN, + Long.MAX_VALUE, + HConstants.LATEST_TIMESTAMP, + 0l, + EnvironmentEdgeManager.currentTimeMillis(), + null); + if (LOG.isTraceEnabled()) + LOG.trace("Created matcher using reflection for HBase 98.9"); + } catch (InstantiationException exc_ins) { + LOG.error("InstantiationException: " + exc_ins); + } catch (IllegalAccessException exc_ill_acc) { + LOG.error("IllegalAccessException: " + exc_ill_acc); + } catch (InvocationTargetException exc_inv_tgt) { + LOG.error("InvocationTargetException: " + exc_inv_tgt); + } + } } catch (Exception e) { LOG.error("error while instantiating the ScanQueryMatcher()" + e); } - } + /** * Get the next row of values from this transaction. * @@ -918,45 +981,43 @@ public class TrxTransactionState extends TransactionState{ } synchronized List<KeyValue> getKeyValues() { - List<KeyValue> edits = new ArrayList<KeyValue>(); - Collection<List<KeyValue>> kvsList = null; + List<KeyValue> edits = new ArrayList<KeyValue>(); + Collection<List<Cell>> cellList = null; - if (put != null) { - if (!put.getFamilyMap().isEmpty()) { - kvsList = put.getFamilyMap().values(); - } - } else if (delete != null) { - if (delete.getFamilyCellMap().isEmpty()) { - // If whole-row delete then we need to expand for each - // family - kvsList = new ArrayList<List<KeyValue>>(1); - for (byte[] family : tabledescriptor.getFamiliesKeys()) { - KeyValue familyDelete = new KeyValue(delete.getRow(), family, null, delete.getTimeStamp(), - KeyValue.Type.DeleteFamily); - kvsList.add(Collections.singletonList(familyDelete)); - } - } else { - kvsList = delete.getFamilyMap().values(); - } - } else { - throw new IllegalStateException("WriteAction is invalid"); - } + if (put != null) { + if (!put.getFamilyCellMap().isEmpty()) { + cellList = put.getFamilyCellMap().values(); + } + } else if (delete != null) { + if (delete.getFamilyCellMap().isEmpty()) { + // If whole-row delete then we need to expand for each family + cellList = new ArrayList<List<Cell>>(1); + for (byte[] family : tabledescriptor.getFamiliesKeys()) { + Cell familyDelete = new KeyValue(delete.getRow(), family, null, delete.getTimeStamp(), + KeyValue.Type.DeleteFamily); + cellList.add(Collections.singletonList(familyDelete)); + } + } else { + cellList = delete.getFamilyCellMap().values(); + } + } else { + throw new IllegalStateException("WriteAction is invalid"); + } - if (kvsList != null) { - for (List<KeyValue> kvs : kvsList) { - for (KeyValue kv : kvs) { - edits.add(kv); - //if (LOG.isDebugEnabled()) LOG.debug("Trafodion getKeyValues: " + regionInfo.getRegionNameAsString() + " create edits for transaction: " - // + transactionId + " with Op " + kv.getType()); - } - } - } - else - if (LOG.isTraceEnabled()) LOG.trace("Trafodion getKeyValues: " - + regionInfo.getRegionNameAsString() + " kvsList was null"); - return edits; - } + if (cellList != null) { + for (List<Cell> cells : cellList) { + for (Cell cell : cells) { + edits.add(new KeyValue(cell)); + // if (LOG.isDebugEnabled()) LOG.debug("Trafodion getKeyValues: " + regionInfo.getRegionNameAsString() + " create edits for transaction: " + // + transactionId + " with Op " + kv.getType()); + } + } + } else if (LOG.isTraceEnabled()) + LOG.trace("Trafodion getKeyValues: " + regionInfo.getRegionNameAsString() + " kvsList was null"); + return edits; + } } + public Set<TrxTransactionState> getTransactionsToCheck() { return transactionsToCheck; }
