[
https://issues.apache.org/jira/browse/HBASE-18027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16025944#comment-16025944
]
Ashu Pachauri commented on HBASE-18027:
---------------------------------------
bq. Instead we try to avoid creating an overlarge RPC by setting the
replication queue capacity limit to the lesser of
replication.source.size.capacity or 95% of the RPC size limit.
I think this does not solve the underlying problem and can still result in
stuck replication due to requests being too large. I was also reading through
the rest of the discussion here. I think handling rpc size limit inside
ReplicationSource does not really make a lot of sense. This is because the
endpoint partitions the batch before making the rpc. So, it makes sense for the
endpoint to make the rpc size enforcement. Changing the batching strategy in
the endpoint actually gives us two benefits:
1. The purpose of this jira, i.e. enforcing rpc size limit.
2. Replication perf is currently plagued by the fact that, in a batch, there is
never more than a single thread shipping edits for a region. So, if a region is
receiving heavy traffic on the source cluster, replication performs very poorly.
As for the implementation, I think you don't need to maintain a lot of state in
the endpoint to enforce rpc limit. All that needs to be done is to partition a
single batch into multiple batches if it exceeds the rpc limit. I think
something like the following would work:
{code}
private List<List<Entry>> createBatches(List<Entry> entries) {
int maxBatchSize =
(int)(0.95 * conf.getInt(RpcServer.MAX_REQUEST_SIZE,
RpcServer.DEFAULT_MAX_REQUEST_SIZE));
int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
// Maintains the current batch for a given partition index
Map<Integer, List<Entry>> entryMap = new HashMap<>(n);
List<List<Entry>> entryLists = new ArrayList<>();
int[] sizes = new int[n];
for (int i = 0; i < n; i++) {
entryMap.put(i, new ArrayList<Entry>(entries.size()/n+1));
}
for (Entry e: entries) {
int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n);
int entrySize = estimatedSize(e);
// If this batch is oversized, add it to final list and initialize a new
empty batch
if (sizes[index] + entrySize > maxBatchSize) {
entryLists.add(entryMap.get(index));
entryMap.put(index, new ArrayList<Entry>());
sizes[index] = 0;
}
entryMap.get(index).add(e);
sizes[index] += entrySize;
}
entryLists.addAll(entryMap.values());
return entryLists;
}
{code}
> Replication should respect RPC size limits when batching edits
> --------------------------------------------------------------
>
> Key: HBASE-18027
> URL: https://issues.apache.org/jira/browse/HBASE-18027
> Project: HBase
> Issue Type: Bug
> Components: Replication
> Affects Versions: 2.0.0, 1.4.0, 1.3.1
> Reporter: Andrew Purtell
> Assignee: Andrew Purtell
> Fix For: 2.0.0, 1.4.0, 1.3.2
>
> Attachments: HBASE-18027-branch-1.patch, HBASE-18027.patch,
> HBASE-18027.patch, HBASE-18027.patch, HBASE-18027.patch, HBASE-18027.patch
>
>
> In HBaseInterClusterReplicationEndpoint#replicate we try to replicate in
> batches. We create N lists. N is the minimum of configured replicator
> threads, number of 100-waledit batches, or number of current sinks. Every
> pending entry in the replication context is then placed in order by hash of
> encoded region name into one of these N lists. Each of the N lists is then
> sent all at once in one replication RPC. We do not test if the sum of data in
> each N list will exceed RPC size limits. This code presumes each individual
> edit is reasonably small. Not checking for aggregate size while assembling
> the lists into RPCs is an oversight and can lead to replication failure when
> that assumption is violated.
> We can fix this by generating as many replication RPC calls as we need to
> drain a list, keeping each RPC under limit, instead of assuming the whole
> list will fit in one.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)