[
https://issues.apache.org/jira/browse/PHOENIX-2900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
chenglei updated PHOENIX-2900:
------------------------------
Description:
When I join a salted table (which is split after creation) with another table
in my business system ,I meet following error:
{code:borderStyle=solid}
org.apache.phoenix.exception.PhoenixIOException:
org.apache.hadoop.hbase.DoNotRetryIOException: Could not find hash cache for
joinId: %�����2. The cache might have expired and have been removed.
at
org.apache.phoenix.coprocessor.HashJoinRegionScanner.<init>(HashJoinRegionScanner.java:98)
at
org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:218)
at
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:201)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1203)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1517)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1592)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1556)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1198)
at
org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3178)
at
org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29925)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
at
org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:116)
at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:96)
at java.lang.Thread.run(Thread.java:745)
at
org.apache.phoenix.util.ServerUtil.parseServerException(ServerUtil.java:111)
at
org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:127)
at
org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:108)
at
org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:103)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
org.apache.phoenix.job.JobManager$InstrumentedJobFutureTask.run(JobManager.java:183)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException:
org.apache.hadoop.hbase.DoNotRetryIOException: Could not find hash cache for
joinId: %�����2. The cache might have expired and have been removed.
at
org.apache.phoenix.coprocessor.HashJoinRegionScanner.<init>(HashJoinRegionScanner.java:98)
at
org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:218)
at
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:201)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1203)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1517)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1592)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1556)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1198)
at
org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3178)
at
org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29925)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
at
org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:116)
at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:96)
at java.lang.Thread.run(Thread.java:745)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95)
at
org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:304)
at
org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:316)
at
org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:164)
at
org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:59)
at
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
at
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90)
at
org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:283)
at
org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:188)
at
org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:183)
at
org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:110)
at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:739)
at
org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:123)
... 7 more
{code}
I write a unit test to reproduce this problem,just as follows:
{code:borderStyle=solid}
public void testSaltTableJoinError() throws Exception
{
//1.create LHS SALT_TEST salted table
this.jdbcTemplate.update("drop table if exists SALT_TEST ");
this.jdbcTemplate.update(
"create table SALT_TEST"+
"("+
"id UNSIGNED_INT not null primary key,"+
"appId VARCHAR"+
")SALT_BUCKETS=2");
this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
values(1,'app1')");
this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
values(2,'app2')");
this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
values(3,'app3')");
this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
values(4,'app4')");
this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
values(5,'app5')");
this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
values(6,'app6')");
//2.split SALT_TEST at rowkey3,i.e.,split the first region
byte[] id3=Bytes.toBytes(3);
byte[] rowKey3=new byte[1+4];
System.arraycopy(id3, 0, rowKey3, 1, 4);
byte salt3=SaltingUtil.getSaltingByte(rowKey3, 1,
rowKey3.length-1, 2);
rowKey3[0]=salt3;
HBaseAdmin hbaseAdmin=this.getHBaseAdmin();
hbaseAdmin.split(Bytes.toBytes("SALT_TEST"), rowKey3);
//3.wait the SALT_TEST split complele
while(hbaseAdmin.getTableRegions(Bytes.toBytes("SALT_TEST")).size() < 3)
{
Thread.sleep(1000);
}
//4.we should make sure region0 and region1 is not on same region server
HRegionInfo
region1Info=hbaseAdmin.getTableRegions(Bytes.toBytes("SALT_TEST")).get(1);
String region1EncodedName=region1Info.getEncodedName();
System.out.println(region1EncodedName);
hbaseAdmin.move(Bytes.toBytes(region1EncodedName), null);
//5.create RHS RIGHT_TEST table
this.jdbcTemplate.update("drop table if exists RIGHT_TEST ");
this.jdbcTemplate.update(
"create table RIGHT_TEST"+
"("+
"appId VARCHAR not null primary key,"+
"createTime VARCHAR"+
")");
this.jdbcTemplate.update("upsert into
RIGHT_TEST(appId,createTime) values('app2','201601')");
this.jdbcTemplate.update("upsert into
RIGHT_TEST(appId,createTime) values('app3','201602')");
this.jdbcTemplate.update("upsert into
RIGHT_TEST(appId,createTime) values('app4','201603')");
this.jdbcTemplate.update("upsert into
RIGHT_TEST(appId,createTime) values('app5','201604')");
//6. join the salted table SALT_TEST with RIGHT_TEST,throw
exception
String sql="select * from SALT_TEST a inner join RIGHT_TEST b
on a.appId=b.appId where a.id>=3 and a.id<=5";
List<Map<String,Object>>
result=this.jdbcTemplate.queryForList(sql, new Object[0]);
System.out.println(result);
}
{code}
I think this problem is caused by the following code in
org.apache.phoenix.compile.ScanRanges (around the 178 line):
{code:borderStyle=solid}
if (useSkipScanFilter && isSalted && !isPointLookup) {
ranges.set(0, SaltingUtil.generateAllSaltingRanges(bucketNum));
}
{code}
in my opinion,the above if statement should be: if (isSalted &&
!isPointLookup).
Another problem is why when the salted table's first region is not split , the
join sql can execute normally? I think it is caused by a subtle code error in
the addServerCache method of org.apache.phoenix.cache.ServerCacheClient class:
{code:borderStyle=solid}
171 for (HRegionLocation entry : locations) {
172 // Keep track of servers we've sent to and only send once
173 byte[] regionStartKey = entry.getRegionInfo().getStartKey();
174 byte[] regionEndKey = entry.getRegionInfo().getEndKey();
175 if ( ! servers.contains(entry) &&
176 keyRanges.intersects(regionStartKey, regionEndKey,
177 cacheUsingTable.getIndexType() ==
IndexType.LOCAL ?
178 ScanUtil.getRowKeyOffset(regionStartKey,
regionEndKey) : 0, true)) {
179 // Call RPC once per server
180 servers.add(entry);
181 if (LOG.isDebugEnabled())
{LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry,
connection));}
182 final byte[] key = entry.getRegionInfo().getStartKey();
183 final HTableInterface htable =
services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
184 closeables.add(htable);
185 futures.add(executor.submit(new JobCallable<Boolean>() {
186
187 @Override
188 public Boolean call() throws Exception {
189 final Map<byte[], AddServerCacheResponse> results;
190 try {
191 results =
htable.coprocessorService(ServerCachingService.class, key, key,
{code}
in the above code,if the first region is not split,the first region
(-∞,\x01\x00\x00\x00\x00) can always pass the if test in line 176, and the
start Key of the first region is a byte[0] empty array,so in line 191, when we
invoke the htable's coprocessorService method, the key parameter is byte[0],
that is to say,the startKey and endKey is both byte[0], but in the following
org.apache.hadoop.hbase.client.HTable's getKeysAndRegionsInRange method,when
the endKey parameter equals HConstants.EMPTY_END_ROW(which is byte[0]), the
bool variable endKeyIsEndOfTable is setted to true, this causes all regions
would be returned,so the RHS will be send to LHS 's all regions,and the join
SQL certainly run normally :
{code:borderStyle=solid}
private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
final boolean reload) throws IOException {
final boolean endKeyIsEndOfTable =
Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
throw new IllegalArgumentException(
"Invalid range: " + Bytes.toStringBinary(startKey) +
" > " + Bytes.toStringBinary(endKey));
}
List<byte[]> keysInRange = new ArrayList<byte[]>();
List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
byte[] currentKey = startKey;
do {
HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
keysInRange.add(currentKey);
regionsInRange.add(regionLocation);
currentKey = regionLocation.getRegionInfo().getEndKey();
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
&& (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
|| (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
regionsInRange);
}
{code}
was:
When I join a salted table (which is split after creation) with another table
in my business system ,I meet following error:
{code:borderStyle=solid}
org.apache.phoenix.exception.PhoenixIOException:
org.apache.hadoop.hbase.DoNotRetryIOException: Could not find hash cache for
joinId: %�����2. The cache might have expired and have been removed.
at
org.apache.phoenix.coprocessor.HashJoinRegionScanner.<init>(HashJoinRegionScanner.java:98)
at
org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:218)
at
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:201)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1203)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1517)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1592)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1556)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1198)
at
org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3178)
at
org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29925)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
at
org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:116)
at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:96)
at java.lang.Thread.run(Thread.java:745)
at
org.apache.phoenix.util.ServerUtil.parseServerException(ServerUtil.java:111)
at
org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:127)
at
org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:108)
at
org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:103)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
org.apache.phoenix.job.JobManager$InstrumentedJobFutureTask.run(JobManager.java:183)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException:
org.apache.hadoop.hbase.DoNotRetryIOException: Could not find hash cache for
joinId: %�����2. The cache might have expired and have been removed.
at
org.apache.phoenix.coprocessor.HashJoinRegionScanner.<init>(HashJoinRegionScanner.java:98)
at
org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:218)
at
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:201)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1203)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1517)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1592)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1556)
at
org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1198)
at
org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3178)
at
org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29925)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
at
org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:116)
at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:96)
at java.lang.Thread.run(Thread.java:745)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95)
at
org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:304)
at
org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:316)
at
org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:164)
at
org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:59)
at
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
at
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90)
at
org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:283)
at
org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:188)
at
org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:183)
at
org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:110)
at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:739)
at
org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:123)
... 7 more
{code}
I write a unit test to reproduce this problem,just as follows:
{code:borderStyle=solid}
public void testSaltTableJoinError() throws Exception
{
//1.create LHS SALT_TEST salted table
this.jdbcTemplate.update("drop table if exists SALT_TEST ");
this.jdbcTemplate.update(
"create table SALT_TEST"+
"("+
"id UNSIGNED_INT not null primary key,"+
"appId VARCHAR"+
")SALT_BUCKETS=2");
this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
values(1,'app1')");
this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
values(2,'app2')");
this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
values(3,'app3')");
this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
values(4,'app4')");
this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
values(5,'app5')");
this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
values(6,'app6')");
//2.split SALT_TEST at rowkey3,i.e.,split the first region
byte[] id3=Bytes.toBytes(3);
byte[] rowKey3=new byte[1+4];
System.arraycopy(id3, 0, rowKey3, 1, 4);
byte salt3=SaltingUtil.getSaltingByte(rowKey3, 1,
rowKey3.length-1, 2);
rowKey3[0]=salt3;
HBaseAdmin hbaseAdmin=this.getHBaseAdmin();
hbaseAdmin.split(Bytes.toBytes("SALT_TEST"), rowKey3);
//3.wait the SALT_TEST split complele
while(hbaseAdmin.getTableRegions(Bytes.toBytes("SALT_TEST")).size() < 3)
{
Thread.sleep(1000);
}
//4.we should make sure region0 and region1 is not on same region server
HRegionInfo
region1Info=hbaseAdmin.getTableRegions(Bytes.toBytes("SALT_TEST")).get(1);
String region1EncodedName=region1Info.getEncodedName();
System.out.println(region1EncodedName);
hbaseAdmin.move(Bytes.toBytes(region1EncodedName), null);
//5.create RHS RIGHT_TEST table
this.jdbcTemplate.update("drop table if exists RIGHT_TEST ");
this.jdbcTemplate.update(
"create table RIGHT_TEST"+
"("+
"appId VARCHAR not null primary key,"+
"createTime VARCHAR"+
")");
this.jdbcTemplate.update("upsert into
RIGHT_TEST(appId,createTime) values('app2','201601')");
this.jdbcTemplate.update("upsert into
RIGHT_TEST(appId,createTime) values('app3','201602')");
this.jdbcTemplate.update("upsert into
RIGHT_TEST(appId,createTime) values('app4','201603')");
this.jdbcTemplate.update("upsert into
RIGHT_TEST(appId,createTime) values('app5','201604')");
//6. join the salted table SALT_TEST with RIGHT_TEST,throw
exception
String sql="select * from SALT_TEST a inner join RIGHT_TEST b
on a.appId=b.appId where a.id>=3 and a.id<=5";
List<Map<String,Object>>
result=this.jdbcTemplate.queryForList(sql, new Object[0]);
System.out.println(result);
}
{code}
I think this problem is caused by the following code in
org.apache.phoenix.compile.ScanRanges (around the 178 line):
{code:borderStyle=solid}
if (useSkipScanFilter && isSalted && !isPointLookup) {
ranges.set(0, SaltingUtil.generateAllSaltingRanges(bucketNum));
}
{code}
in my opinion,the above if statement should be: if (isSalted &&
!isPointLookup).
Another problem is why when the salted table's first region is not split , the
join sql can execute normally? I think it is caused by a subtle code error in
the addServerCache method of org.apache.phoenix.cache.ServerCacheClient class:
{code:borderStyle=solid}
171 for (HRegionLocation entry : locations) {
172 // Keep track of servers we've sent to and only send once
173 byte[] regionStartKey = entry.getRegionInfo().getStartKey();
174 byte[] regionEndKey = entry.getRegionInfo().getEndKey();
175 if ( ! servers.contains(entry) &&
176 keyRanges.intersects(regionStartKey, regionEndKey,
177 cacheUsingTable.getIndexType() ==
IndexType.LOCAL ?
178 ScanUtil.getRowKeyOffset(regionStartKey,
regionEndKey) : 0, true)) {
179 // Call RPC once per server
180 servers.add(entry);
181 if (LOG.isDebugEnabled())
{LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry,
connection));}
182 final byte[] key = entry.getRegionInfo().getStartKey();
183 final HTableInterface htable =
services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
184 closeables.add(htable);
185 futures.add(executor.submit(new JobCallable<Boolean>() {
186
187 @Override
188 public Boolean call() throws Exception {
189 final Map<byte[], AddServerCacheResponse> results;
190 try {
191 results =
htable.coprocessorService(ServerCachingService.class, key, key,
{code}
> Once a salted table 's first region has been split, the Join SQL which the
> salted table as lhs may cause "could not find hash cache for joinId" error
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: PHOENIX-2900
> URL: https://issues.apache.org/jira/browse/PHOENIX-2900
> Project: Phoenix
> Issue Type: Bug
> Affects Versions: 4.7.0
> Environment: Phoenix-4.7.0-HBase-0.98,HBase-0.98.6-cdh5.3.2
> Reporter: chenglei
> Priority: Critical
>
> When I join a salted table (which is split after creation) with another table
> in my business system ,I meet following error:
> {code:borderStyle=solid}
> org.apache.phoenix.exception.PhoenixIOException:
> org.apache.hadoop.hbase.DoNotRetryIOException: Could not find hash cache for
> joinId: %�����2. The cache might have expired and have been removed.
> at
> org.apache.phoenix.coprocessor.HashJoinRegionScanner.<init>(HashJoinRegionScanner.java:98)
> at
> org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:218)
> at
> org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:201)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1203)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1517)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1592)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1556)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1198)
> at
> org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3178)
> at
> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29925)
> at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031)
> at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
> at
> org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:116)
> at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:96)
> at java.lang.Thread.run(Thread.java:745)
> at
> org.apache.phoenix.util.ServerUtil.parseServerException(ServerUtil.java:111)
> at
> org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:127)
> at
> org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:108)
> at
> org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:103)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> org.apache.phoenix.job.JobManager$InstrumentedJobFutureTask.run(JobManager.java:183)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hadoop.hbase.DoNotRetryIOException:
> org.apache.hadoop.hbase.DoNotRetryIOException: Could not find hash cache for
> joinId: %�����2. The cache might have expired and have been removed.
> at
> org.apache.phoenix.coprocessor.HashJoinRegionScanner.<init>(HashJoinRegionScanner.java:98)
> at
> org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:218)
> at
> org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:201)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1203)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1517)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1592)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1556)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1198)
> at
> org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3178)
> at
> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29925)
> at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031)
> at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
> at
> org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:116)
> at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:96)
> at java.lang.Thread.run(Thread.java:745)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
> at
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95)
> at
> org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:304)
> at
> org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:316)
> at
> org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:164)
> at
> org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:59)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90)
> at
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:283)
> at
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:188)
> at
> org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:183)
> at
> org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:110)
> at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:739)
> at
> org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:123)
> ... 7 more
> {code}
> I write a unit test to reproduce this problem,just as follows:
> {code:borderStyle=solid}
> public void testSaltTableJoinError() throws Exception
> {
> //1.create LHS SALT_TEST salted table
> this.jdbcTemplate.update("drop table if exists SALT_TEST ");
> this.jdbcTemplate.update(
> "create table SALT_TEST"+
> "("+
> "id UNSIGNED_INT not null primary key,"+
> "appId VARCHAR"+
> ")SALT_BUCKETS=2");
>
> this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
> values(1,'app1')");
> this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
> values(2,'app2')");
> this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
> values(3,'app3')");
> this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
> values(4,'app4')");
> this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
> values(5,'app5')");
> this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
> values(6,'app6')");
>
> //2.split SALT_TEST at rowkey3,i.e.,split the first region
> byte[] id3=Bytes.toBytes(3);
> byte[] rowKey3=new byte[1+4];
> System.arraycopy(id3, 0, rowKey3, 1, 4);
> byte salt3=SaltingUtil.getSaltingByte(rowKey3, 1,
> rowKey3.length-1, 2);
> rowKey3[0]=salt3;
> HBaseAdmin hbaseAdmin=this.getHBaseAdmin();
> hbaseAdmin.split(Bytes.toBytes("SALT_TEST"), rowKey3);
>
> //3.wait the SALT_TEST split complele
>
> while(hbaseAdmin.getTableRegions(Bytes.toBytes("SALT_TEST")).size() < 3)
> {
> Thread.sleep(1000);
> }
> //4.we should make sure region0 and region1 is not on same region
> server
> HRegionInfo
> region1Info=hbaseAdmin.getTableRegions(Bytes.toBytes("SALT_TEST")).get(1);
> String region1EncodedName=region1Info.getEncodedName();
> System.out.println(region1EncodedName);
> hbaseAdmin.move(Bytes.toBytes(region1EncodedName), null);
>
>
> //5.create RHS RIGHT_TEST table
> this.jdbcTemplate.update("drop table if exists RIGHT_TEST ");
> this.jdbcTemplate.update(
> "create table RIGHT_TEST"+
> "("+
> "appId VARCHAR not null primary key,"+
> "createTime VARCHAR"+
> ")");
> this.jdbcTemplate.update("upsert into
> RIGHT_TEST(appId,createTime) values('app2','201601')");
> this.jdbcTemplate.update("upsert into
> RIGHT_TEST(appId,createTime) values('app3','201602')");
> this.jdbcTemplate.update("upsert into
> RIGHT_TEST(appId,createTime) values('app4','201603')");
> this.jdbcTemplate.update("upsert into
> RIGHT_TEST(appId,createTime) values('app5','201604')");
>
> //6. join the salted table SALT_TEST with RIGHT_TEST,throw
> exception
> String sql="select * from SALT_TEST a inner join RIGHT_TEST b
> on a.appId=b.appId where a.id>=3 and a.id<=5";
> List<Map<String,Object>>
> result=this.jdbcTemplate.queryForList(sql, new Object[0]);
> System.out.println(result);
> }
> {code}
> I think this problem is caused by the following code in
> org.apache.phoenix.compile.ScanRanges (around the 178 line):
> {code:borderStyle=solid}
> if (useSkipScanFilter && isSalted && !isPointLookup) {
> ranges.set(0, SaltingUtil.generateAllSaltingRanges(bucketNum));
> }
> {code}
> in my opinion,the above if statement should be: if (isSalted &&
> !isPointLookup).
> Another problem is why when the salted table's first region is not split ,
> the join sql can execute normally? I think it is caused by a subtle code
> error in the addServerCache method of
> org.apache.phoenix.cache.ServerCacheClient class:
> {code:borderStyle=solid}
> 171 for (HRegionLocation entry : locations) {
> 172 // Keep track of servers we've sent to and only send once
> 173 byte[] regionStartKey =
> entry.getRegionInfo().getStartKey();
> 174 byte[] regionEndKey = entry.getRegionInfo().getEndKey();
> 175 if ( ! servers.contains(entry) &&
> 176 keyRanges.intersects(regionStartKey, regionEndKey,
> 177 cacheUsingTable.getIndexType() ==
> IndexType.LOCAL ?
> 178
> ScanUtil.getRowKeyOffset(regionStartKey, regionEndKey) : 0, true)) {
> 179 // Call RPC once per server
> 180 servers.add(entry);
> 181 if (LOG.isDebugEnabled())
> {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry,
> connection));}
> 182 final byte[] key = entry.getRegionInfo().getStartKey();
> 183 final HTableInterface htable =
> services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
> 184 closeables.add(htable);
> 185 futures.add(executor.submit(new JobCallable<Boolean>() {
> 186
> 187 @Override
> 188 public Boolean call() throws Exception {
> 189 final Map<byte[], AddServerCacheResponse>
> results;
> 190 try {
> 191 results =
> htable.coprocessorService(ServerCachingService.class, key, key,
> {code}
> in the above code,if the first region is not split,the first region
> (-∞,\x01\x00\x00\x00\x00) can always pass the if test in line 176, and the
> start Key of the first region is a byte[0] empty array,so in line 191, when
> we invoke the htable's coprocessorService method, the key parameter is
> byte[0], that is to say,the startKey and endKey is both byte[0], but in the
> following org.apache.hadoop.hbase.client.HTable's getKeysAndRegionsInRange
> method,when the endKey parameter equals HConstants.EMPTY_END_ROW(which is
> byte[0]), the bool variable endKeyIsEndOfTable is setted to true, this causes
> all regions would be returned,so the RHS will be send to LHS 's all
> regions,and the join SQL certainly run normally :
> {code:borderStyle=solid}
> private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
> final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
> final boolean reload) throws IOException {
> final boolean endKeyIsEndOfTable =
> Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
> if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
> throw new IllegalArgumentException(
> "Invalid range: " + Bytes.toStringBinary(startKey) +
> " > " + Bytes.toStringBinary(endKey));
> }
> List<byte[]> keysInRange = new ArrayList<byte[]>();
> List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
> byte[] currentKey = startKey;
> do {
> HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
> keysInRange.add(currentKey);
> regionsInRange.add(regionLocation);
> currentKey = regionLocation.getRegionInfo().getEndKey();
> } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
> && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
> || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
> return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
> regionsInRange);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)