[
https://issues.apache.org/jira/browse/PHOENIX-2900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433203#comment-15433203
]
ASF GitHub Bot commented on PHOENIX-2900:
-----------------------------------------
Github user comnetwork closed the pull request at:
https://github.com/apache/phoenix/pull/180
> Unable to find hash cache once a salted table 's first region has split
> -----------------------------------------------------------------------
>
> Key: PHOENIX-2900
> URL: https://issues.apache.org/jira/browse/PHOENIX-2900
> Project: Phoenix
> Issue Type: Bug
> Affects Versions: 4.7.0
> Reporter: chenglei
> Assignee: chenglei
> Priority: Critical
> Fix For: 4.8.0
>
> Attachments: PHOENIX-2900_addendum1.patch, PHOENIX-2900_v1.patch,
> PHOENIX-2900_v2.patch, PHOENIX-2900_v3.patch, PHOENIX-2900_v4.patch,
> PHOENIX-2900_v5.patch, PHOENIX-2900_v6.patch, PHOENIX-2900_v7.patch
>
>
> When I join a salted table (which has been split after creation) with another
> table in my business system ,I meet following error,even though I clear the
> salted table 's TableRegionCache:
> {code:borderStyle=solid}
> org.apache.phoenix.exception.PhoenixIOException:
> org.apache.hadoop.hbase.DoNotRetryIOException: Could not find hash cache for
> joinId: %�����2. The cache might have expired and have been removed.
> at
> org.apache.phoenix.coprocessor.HashJoinRegionScanner.<init>(HashJoinRegionScanner.java:98)
> at
> org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:218)
> at
> org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:201)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1203)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1517)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1592)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1556)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1198)
> at
> org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3178)
> at
> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29925)
> at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031)
> at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
> at
> org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:116)
> at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:96)
> at java.lang.Thread.run(Thread.java:745)
> at
> org.apache.phoenix.util.ServerUtil.parseServerException(ServerUtil.java:111)
> at
> org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:127)
> at
> org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:108)
> at
> org.apache.phoenix.iterate.ParallelIterators$1.call(ParallelIterators.java:103)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> org.apache.phoenix.job.JobManager$InstrumentedJobFutureTask.run(JobManager.java:183)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hadoop.hbase.DoNotRetryIOException:
> org.apache.hadoop.hbase.DoNotRetryIOException: Could not find hash cache for
> joinId: %�����2. The cache might have expired and have been removed.
> at
> org.apache.phoenix.coprocessor.HashJoinRegionScanner.<init>(HashJoinRegionScanner.java:98)
> at
> org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:218)
> at
> org.apache.phoenix.coprocessor.BaseScannerRegionObserver.postScannerOpen(BaseScannerRegionObserver.java:201)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$52.call(RegionCoprocessorHost.java:1203)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$RegionOperation.call(RegionCoprocessorHost.java:1517)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperation(RegionCoprocessorHost.java:1592)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.execOperationWithResult(RegionCoprocessorHost.java:1556)
> at
> org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postScannerOpen(RegionCoprocessorHost.java:1198)
> at
> org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3178)
> at
> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29925)
> at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2031)
> at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
> at
> org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:116)
> at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:96)
> at java.lang.Thread.run(Thread.java:745)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
> at
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95)
> at
> org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:304)
> at
> org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:316)
> at
> org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:164)
> at
> org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:59)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90)
> at
> org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:283)
> at
> org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:188)
> at
> org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:183)
> at
> org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:110)
> at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:739)
> at
> org.apache.phoenix.iterate.TableResultIterator.initScanner(TableResultIterator.java:123)
> ... 7 more
> {code}
> I write a unit test to reproduce this error, just as follows:
> {code:borderStyle=solid}
> public void testSaltTableJoinError() throws Exception
> {
> //1.create LHS SALT_TEST salted table
> this.jdbcTemplate.update("drop table if exists SALT_TEST ");
> this.jdbcTemplate.update(
> "create table SALT_TEST"+
> "("+
> "id UNSIGNED_INT not null primary key,"+
> "appId VARCHAR"+
> ")SALT_BUCKETS=2");
>
> this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
> values(1,'app1')");
> this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
> values(2,'app2')");
> this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
> values(3,'app3')");
> this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
> values(4,'app4')");
> this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
> values(5,'app5')");
> this.jdbcTemplate.update("upsert into SALT_TEST(id,appId)
> values(6,'app6')");
>
> //2.split SALT_TEST at rowkey3,i.e.,split the first region
> byte[] id3=Bytes.toBytes(3);
> byte[] rowKey3=new byte[1+4];
> System.arraycopy(id3, 0, rowKey3, 1, 4);
> byte salt3=SaltingUtil.getSaltingByte(rowKey3, 1,
> rowKey3.length-1, 2);
> rowKey3[0]=salt3;
> HBaseAdmin hbaseAdmin=this.getHBaseAdmin();
> hbaseAdmin.split(Bytes.toBytes("SALT_TEST"), rowKey3);
>
> //3.wait the SALT_TEST split complele
>
> while(hbaseAdmin.getTableRegions(Bytes.toBytes("SALT_TEST")).size() < 3)
> {
> Thread.sleep(1000);
> }
> //4.we should make sure region0 and region1 is not on same
> region server,not share the GlobalCache
> HRegionInfo
> region1Info=hbaseAdmin.getTableRegions(Bytes.toBytes("SALT_TEST")).get(1);
> String region1EncodedName=region1Info.getEncodedName();
> System.out.println(region1EncodedName);
> hbaseAdmin.move(Bytes.toBytes(region1EncodedName), null);
>
>
> //5.create RHS RIGHT_TEST table
> this.jdbcTemplate.update("drop table if exists RIGHT_TEST ");
> this.jdbcTemplate.update(
> "create table RIGHT_TEST"+
> "("+
> "appId VARCHAR not null primary key,"+
> "createTime VARCHAR"+
> ")");
> this.jdbcTemplate.update("upsert into
> RIGHT_TEST(appId,createTime) values('app2','201601')");
> this.jdbcTemplate.update("upsert into
> RIGHT_TEST(appId,createTime) values('app3','201602')");
> this.jdbcTemplate.update("upsert into
> RIGHT_TEST(appId,createTime) values('app4','201603')");
> this.jdbcTemplate.update("upsert into
> RIGHT_TEST(appId,createTime) values('app5','201604')");
> //6.clear SALT_TEST's TableRegionCache,let join know
> SALT_TEST's newest 3 region
> ((PhoenixConnection)this.dataSource.getConnection())
>
> .getQueryServices().clearTableRegionCache(Bytes.toBytes("SALT_TEST"));
>
> //7. join the salted table SALT_TEST with RIGHT_TEST,throw
> exception
> String sql="select * from SALT_TEST a inner join RIGHT_TEST b
> on a.appId=b.appId where a.id>=3 and a.id<=5";
> List<Map<String,Object>>
> result=this.jdbcTemplate.queryForList(sql, new Object[0]);
> assertTrue(result.size()==3);
> }
> {code}
> I debug the source code,and find something error in the addServerCache
> method of org.apache.phoenix.cache.ServerCacheClient class, just as the
> following code. In line 166, we get all three regions of SALT_TEST table, but
> in line 176,the keyRanges is [[[\x00], [[\x00\x00\x00\x03 -
> \x00\x00\x00\x05]]]], so only the second region
> [\x00\x00\x00\x00\x03,\x01\x00\x00\x00\x00) can pass the keyRanges's
> intersects method in line 176. As a result, the RHS is only sent to the
> second region,However,we know SALT_TEST is a salted table, the third region
> [\x01\x00\x00\x00\x00,+∞) also has records matching the where condition,and
> the RHS should also be sent to the third region. Therefore, the correct
> keyRanges should be [[[\x00-\x01], [[\x00\x00\x00\x03 -
> \x00\x00\x00\x05]]]],not [[[\x00], [[\x00\x00\x00\x03 - \x00\x00\x00\x05]]]]
> {code:borderStyle=solid}
> 164 try {
> 165 final PTable cacheUsingTable = cacheUsingTableRef.getTable();
> 166 List<HRegionLocation> locations =
> services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes());
> 167 int nRegions = locations.size();
> 168 // Size these based on worst case
> 169 futures = new ArrayList<Future<Boolean>>(nRegions);
> 170 Set<HRegionLocation> servers = new
> HashSet<HRegionLocation>(nRegions);
> 171 for (HRegionLocation entry : locations) {
> 172 // Keep track of servers we've sent to and only send once
> 173 byte[] regionStartKey =
> entry.getRegionInfo().getStartKey();
> 174 byte[] regionEndKey = entry.getRegionInfo().getEndKey();
> 175 if ( ! servers.contains(entry) &&
> 176 keyRanges.intersects(regionStartKey, regionEndKey,
> 177 cacheUsingTable.getIndexType() ==
> IndexType.LOCAL ?
> 178
> ScanUtil.getRowKeyOffset(regionStartKey, regionEndKey) : 0, true)) {
> 179 // Call RPC once per server
> 180 servers.add(entry);
> 181 if (LOG.isDebugEnabled())
> {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry,
> connection));}
> 182 final byte[] key = entry.getRegionInfo().getStartKey();
> 183 final HTableInterface htable =
> services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
> 184 closeables.add(htable);
> 185 futures.add(executor.submit(new JobCallable<Boolean>() {
> 186
> 187 @Override
> 188 public Boolean call() throws Exception {
> 189 final Map<byte[], AddServerCacheResponse>
> results;
> 190 try {
> 191 results =
> htable.coprocessorService(ServerCachingService.class, key, key,
> {code}
> I think the incorrect keyRanges problem is caused by the following code in
> org.apache.phoenix.compile.ScanRanges's ctor (around the 178 line), In my
> opinion ,the if statement should be if (isSalted && !isPointLookup).
> {code:borderStyle=solid}
> if (useSkipScanFilter && isSalted && !isPointLookup) {
> ranges.set(0, SaltingUtil.generateAllSaltingRanges(bucketNum));
> }
> {code}
> Another problem is why when the salted table's first region is not split ,
> the join sql can execute normally? I think it is caused by a subtle code
> error in the addServerCache method of
> org.apache.phoenix.cache.ServerCacheClient class:
> {code:borderStyle=solid}
> 171 for (HRegionLocation entry : locations) {
> 172 // Keep track of servers we've sent to and only send once
> 173 byte[] regionStartKey =
> entry.getRegionInfo().getStartKey();
> 174 byte[] regionEndKey = entry.getRegionInfo().getEndKey();
> 175 if ( ! servers.contains(entry) &&
> 176 keyRanges.intersects(regionStartKey, regionEndKey,
> 177 cacheUsingTable.getIndexType() ==
> IndexType.LOCAL ?
> 178
> ScanUtil.getRowKeyOffset(regionStartKey, regionEndKey) : 0, true)) {
> 179 // Call RPC once per server
> 180 servers.add(entry);
> 181 if (LOG.isDebugEnabled())
> {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry,
> connection));}
> 182 final byte[] key = entry.getRegionInfo().getStartKey();
> 183 final HTableInterface htable =
> services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
> 184 closeables.add(htable);
> 185 futures.add(executor.submit(new JobCallable<Boolean>() {
> 186
> 187 @Override
> 188 public Boolean call() throws Exception {
> 189 final Map<byte[], AddServerCacheResponse>
> results;
> 190 try {
> 191 results =
> htable.coprocessorService(ServerCachingService.class, key, key,
> {code}
> In the above code, if the first region is not split, the first region
> (-∞,\x01\x00\x00\x00\x00) can always pass the if test in line 176, and the
> startKey of the first region is a byte[0](empty array), so in line 191, when
> we invoke the htable's coprocessorService method, the startKey and endKey
> parameters are both byte[0].
> However, in the following org.apache.hadoop.hbase.client.HTable's
> getKeysAndRegionsInRange method, when the endKey parameter equals
> HConstants.EMPTY_END_ROW(which is also byte[0]), the bool variable
> endKeyIsEndOfTable is set to true. This causes all regions would be returned,
> which obviously is not as expected. Eventually, the RHS is sent to LHS 's
> all regions,and the join SQL certainly run normally :
> {code:borderStyle=solid}
> private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(
> final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
> final boolean reload) throws IOException {
> final boolean endKeyIsEndOfTable =
> Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
> if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
> throw new IllegalArgumentException(
> "Invalid range: " + Bytes.toStringBinary(startKey) +
> " > " + Bytes.toStringBinary(endKey));
> }
> List<byte[]> keysInRange = new ArrayList<byte[]>();
> List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>();
> byte[] currentKey = startKey;
> do {
> HRegionLocation regionLocation = getRegionLocation(currentKey, reload);
> keysInRange.add(currentKey);
> regionsInRange.add(regionLocation);
> currentKey = regionLocation.getRegionInfo().getEndKey();
> } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
> && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
> || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
> return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange,
> regionsInRange);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)