[ 
https://issues.apache.org/jira/browse/PHOENIX-2900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenglei updated PHOENIX-2900:
------------------------------
    Summary: Once a salted table 's first region has been split, the Join SQL 
which the salted table as LHS may cause "Could not find hash cache for joinId" 
error  (was: Once a salted table 's first region has been split, the Join SQL 
which the salted table as LHS may cause "could not find hash cache for joinId" 
error)

> Once a salted table 's first region has been split, the Join SQL which the 
> salted table as LHS may cause "Could not find hash cache for joinId" error
> -----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: PHOENIX-2900
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-2900
>             Project: Phoenix
>          Issue Type: Bug
>    Affects Versions: 4.7.0
>         Environment: Phoenix-4.7.0-HBase-0.98,HBase-0.98.6-cdh5.3.2
>            Reporter: chenglei
>            Priority: Critical
>
> When I join a salted table (which is split after creation) with another table 
> in my business system ,I meet following error: 
> {code:borderStyle=solid} 
>  org.apache.phoenix.exception.PhoenixIOException: 
> org.apache.hadoop.hbase.DoNotRetryIOException: Could not find hash cache for 
> joinId: %�����2. The cache might have expired and have been removed.
>       at 
> org.apache.phoenix.coprocessor.HashJoinRegionScanner.<init>(HashJoinRegionScanner.java:98)
>       at 
> org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:218)
>       at 
> org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:201)
>       at 
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1203)
>       at 
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1517)
>       at 
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1592)
>       at 
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1556)
>       at 
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1198)
>       at 
> org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3178)
>       at 
> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29925)
>       at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031)
>       at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
>       at 
> org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:116)
>       at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:96)
>       at java.lang.Thread.run(Thread.java:745)
>       at 
> org.apache.phoenix.util.ServerUtil.parseServerException(ServerUtil.java:111)
>       at 
> org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:127)
>       at 
> org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:108)
>       at 
> org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:103)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>       at 
> org.apache.phoenix.job.JobManager$InstrumentedJobFutureTask.run(JobManager.java:183)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: 
> org.apache.hadoop.hbase.DoNotRetryIOException: Could not find hash cache for 
> joinId: %�����2. The cache might have expired and have been removed.
>       at 
> org.apache.phoenix.coprocessor.HashJoinRegionScanner.<init>(HashJoinRegionScanner.java:98)
>       at 
> org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:218)
>       at 
> org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:201)
>       at 
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1203)
>       at 
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1517)
>       at 
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1592)
>       at 
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1556)
>       at 
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1198)
>       at 
> org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3178)
>       at 
> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29925)
>       at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031)
>       at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
>       at 
> org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:116)
>       at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:96)
>       at java.lang.Thread.run(Thread.java:745)
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>       at 
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>       at 
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95)
>       at 
> org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:304)
>       at 
> org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:316)
>       at 
> org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:164)
>       at 
> org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:59)
>       at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
>       at 
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90)
>       at 
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:283)
>       at 
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:188)
>       at 
> org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:183)
>       at 
> org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:110)
>       at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:739)
>       at 
> org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:123)
>       ... 7 more
> {code}
> I write a unit test to reproduce this problem,just as follows: 
> {code:borderStyle=solid} 
>      public void testSaltTableJoinError() throws Exception
>      {
>               //1.create LHS  SALT_TEST salted table
>               this.jdbcTemplate.update("drop table if exists SALT_TEST ");
>               this.jdbcTemplate.update(
>                               "create table SALT_TEST"+
>                               "("+
>                               "id UNSIGNED_INT not null primary key,"+
>                               "appId VARCHAR"+
>                       ")SALT_BUCKETS=2");
>               
>               this.jdbcTemplate.update("upsert into SALT_TEST(id,appId) 
> values(1,'app1')");
>               this.jdbcTemplate.update("upsert into SALT_TEST(id,appId) 
> values(2,'app2')");
>               this.jdbcTemplate.update("upsert into SALT_TEST(id,appId) 
> values(3,'app3')");
>               this.jdbcTemplate.update("upsert into SALT_TEST(id,appId) 
> values(4,'app4')");
>               this.jdbcTemplate.update("upsert into SALT_TEST(id,appId) 
> values(5,'app5')");
>               this.jdbcTemplate.update("upsert into SALT_TEST(id,appId) 
> values(6,'app6')");
>               
>               //2.split SALT_TEST at rowkey3,i.e.,split the first region
>               byte[] id3=Bytes.toBytes(3);
>               byte[] rowKey3=new byte[1+4];
>               System.arraycopy(id3, 0, rowKey3, 1, 4);
>               byte salt3=SaltingUtil.getSaltingByte(rowKey3, 1, 
> rowKey3.length-1, 2);
>               rowKey3[0]=salt3;
>               HBaseAdmin hbaseAdmin=this.getHBaseAdmin();
>               hbaseAdmin.split(Bytes.toBytes("SALT_TEST"), rowKey3);
>               
>               //3.wait the SALT_TEST split complele 
>               
> while(hbaseAdmin.getTableRegions(Bytes.toBytes("SALT_TEST")).size() < 3)
>               {
>                       Thread.sleep(1000);
>               }
>         //4.we should make sure region0 and region1 is not on same region 
> server
>               HRegionInfo 
> region1Info=hbaseAdmin.getTableRegions(Bytes.toBytes("SALT_TEST")).get(1);
>               String region1EncodedName=region1Info.getEncodedName();
>               System.out.println(region1EncodedName);
>               hbaseAdmin.move(Bytes.toBytes(region1EncodedName), null);
>               
>               
>               //5.create RHS RIGHT_TEST table
>               this.jdbcTemplate.update("drop table if exists RIGHT_TEST ");
>               this.jdbcTemplate.update(
>                               "create table RIGHT_TEST"+
>                               "("+
>                               "appId VARCHAR not null primary key,"+
>                               "createTime VARCHAR"+
>                       ")");
>               this.jdbcTemplate.update("upsert into 
> RIGHT_TEST(appId,createTime) values('app2','201601')");
>               this.jdbcTemplate.update("upsert into 
> RIGHT_TEST(appId,createTime) values('app3','201602')");
>               this.jdbcTemplate.update("upsert into 
> RIGHT_TEST(appId,createTime) values('app4','201603')");
>               this.jdbcTemplate.update("upsert into 
> RIGHT_TEST(appId,createTime) values('app5','201604')");
>               
>               //6. join the salted table SALT_TEST with RIGHT_TEST,throw 
> exception
>               String sql="select * from SALT_TEST a inner join RIGHT_TEST b 
> on a.appId=b.appId where a.id>=3 and a.id<=5";
>               List<Map<String,Object>> 
> result=this.jdbcTemplate.queryForList(sql, new Object[0]);
>               System.out.println(result);
>       }
> {code}
> I think this problem is caused by the following code in 
> org.apache.phoenix.compile.ScanRanges (around the 178 line):
> {code:borderStyle=solid} 
>          if (useSkipScanFilter && isSalted && !isPointLookup) {
>               ranges.set(0, SaltingUtil.generateAllSaltingRanges(bucketNum));
>         }
> {code}
> in my opinion,the above if statement should be:   if (isSalted && 
> !isPointLookup).
> Another problem is why when the salted table's first region is not split , 
> the join sql can execute normally? I think it is caused by a subtle code 
> error in the addServerCache method of 
> org.apache.phoenix.cache.ServerCacheClient  class:
> {code:borderStyle=solid} 
>  171           for (HRegionLocation entry : locations) {
>  172              // Keep track of servers we've sent to and only send once
>  173               byte[] regionStartKey = 
> entry.getRegionInfo().getStartKey();
>  174              byte[] regionEndKey = entry.getRegionInfo().getEndKey();
>  175               if ( ! servers.contains(entry) && 
>  176                      keyRanges.intersects(regionStartKey, regionEndKey,
>  177                               cacheUsingTable.getIndexType() == 
> IndexType.LOCAL ? 
>  178                                  
> ScanUtil.getRowKeyOffset(regionStartKey, regionEndKey) : 0, true)) {  
>  179                   // Call RPC once per server
>  180                  servers.add(entry);
>  181                   if (LOG.isDebugEnabled()) 
> {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry, 
> connection));}
>  182                   final byte[] key = entry.getRegionInfo().getStartKey();
>  183                  final HTableInterface htable = 
> services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
>  184                   closeables.add(htable);
>  185                  futures.add(executor.submit(new JobCallable<Boolean>() {
>  186                      
>  187                       @Override
>  188                      public Boolean call() throws Exception {
>  189                          final Map<byte[], AddServerCacheResponse> 
> results;
>  190                           try {
>  191                              results = 
> htable.coprocessorService(ServerCachingService.class, key, key, 
> {code}
> In the above code, if the first region is not split, the first region 
> (-∞,\x01\x00\x00\x00\x00)  can always pass the if  test in line 176, and the 
> start Key of the first region is a byte[0](empty array), so in line 191, when 
> we invoke the htable's coprocessorService method, the startKey and endKey 
> parameters are both byte[0].  However,  in the following 
> org.apache.hadoop.hbase.client.HTable's getKeysAndRegionsInRange method, when 
> the endKey parameter equals HConstants.EMPTY_END_ROW(which is also byte[0]), 
> the bool variable endKeyIsEndOfTable is set to true, this causes all regions 
> would be returned, which may not be as expected.  Eventually, the RHS would 
> be sended to LHS 's all regions,and the join SQL certainly run normally :
> {code:borderStyle=solid} 
> private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
>       final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
>       final boolean reload) throws IOException {
>     final boolean endKeyIsEndOfTable = 
> Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
>     if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
>       throw new IllegalArgumentException(
>         "Invalid range: " + Bytes.toStringBinary(startKey) +
>         " > " + Bytes.toStringBinary(endKey));
>     }
>     List<byte[]> keysInRange = new ArrayList<byte[]>();
>     List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
>     byte[] currentKey = startKey;
>     do {
>       HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
>       keysInRange.add(currentKey);
>       regionsInRange.add(regionLocation);
>       currentKey = regionLocation.getRegionInfo().getEndKey();
>     } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
>         && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
>             || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
>     return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
>         regionsInRange);
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to