infraio commented on a change in pull request #1722:
URL: https://github.com/apache/hbase/pull/1722#discussion_r434253468
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
##########
@@ -279,28 +296,148 @@ private int getEstimatedEntrySize(Entry e) {
}
}
- private TableName parseTable(String msg) {
- // ... TableNotFoundException: '<table>'/n...
- Pattern p = Pattern.compile("TableNotFoundException: '([\\S]*)'");
- Matcher m = p.matcher(msg);
- if (m.find()) {
- String table = m.group(1);
- try {
- // double check that table is a valid table name
-
TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table)));
- return TableName.valueOf(table);
- } catch (IllegalArgumentException ignore) {
+ /**
+ * Check if there's an {@link TableNotFoundException} in the caused by
stacktrace.
+ */
+ @VisibleForTesting
+ public static boolean isTableNotFoundException(Throwable io) {
+ if (io instanceof RemoteException) {
+ io = ((RemoteException) io).unwrapRemoteException();
+ }
+ if (io != null && io.getMessage().contains("TableNotFoundException")) {
+ return true;
+ }
+ for (; io != null; io = io.getCause()) {
+ if (io instanceof TableNotFoundException) {
+ return true;
}
}
- return null;
+ return false;
}
- // Filter a set of batches by TableName
- private List<List<Entry>> filterBatches(final List<List<Entry>>
oldEntryList, TableName table) {
- return oldEntryList
- .stream().map(entries -> entries.stream()
- .filter(e ->
!e.getKey().getTableName().equals(table)).collect(Collectors.toList()))
- .collect(Collectors.toList());
+ /**
+ * Check if there's an {@link NoSuchColumnFamilyException} in the caused by
stacktrace.
+ */
+ @VisibleForTesting
+ public static boolean isNoSuchColumnFamilyException(Throwable io) {
+ if (io instanceof RemoteException) {
+ io = ((RemoteException) io).unwrapRemoteException();
+ }
+ if (io != null && io.getMessage().contains("NoSuchColumnFamilyException"))
{
+ return true;
+ }
+ for (; io != null; io = io.getCause()) {
+ if (io instanceof NoSuchColumnFamilyException) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @VisibleForTesting
+ List<List<Entry>> filterNotExistTableEdits(final List<List<Entry>>
oldEntryList) {
+ List<List<Entry>> entryList = new ArrayList<>();
+ Map<TableName, Boolean> existMap = new HashMap<>();
+ try (Connection localConn =
ConnectionFactory.createConnection(ctx.getLocalConfiguration());
+ Admin localAdmin = localConn.getAdmin()) {
+ for (List<Entry> oldEntries : oldEntryList) {
+ List<Entry> entries = new ArrayList<>();
+ for (Entry e : oldEntries) {
+ TableName tableName = e.getKey().getTableName();
+ boolean exist = true;
+ if (existMap.containsKey(tableName)) {
+ exist = existMap.get(tableName);
+ } else {
+ try {
+ exist = localAdmin.tableExists(tableName);
+ existMap.put(tableName, exist);
+ } catch (IOException iox) {
+ LOG.warn("Exception checking for local table " + tableName, iox);
+ // we can't drop edits without full assurance, so we assume
table exists.
+ exist = true;
+ }
+ }
+ if (exist) {
+ entries.add(e);
+ } else {
+ // Would potentially be better to retry in one of the outer loops
+ // and add a table filter there; but that would break the
encapsulation,
+ // so we're doing the filtering here.
+ LOG.warn("Missing table detected at sink, local table also does
not exist, "
+ + "filtering edits for table '{}'", tableName);
+ }
+ }
+ if (!entries.isEmpty()) {
+ entryList.add(entries);
+ }
+ }
+ } catch (IOException iox) {
+ LOG.warn("Exception when creating connection to check local table", iox);
+ return oldEntryList;
+ }
+ return entryList;
+ }
+
+ @VisibleForTesting
+ List<List<Entry>> filterNotExistColumnFamilyEdits(final List<List<Entry>>
oldEntryList) {
+ List<List<Entry>> entryList = new ArrayList<>();
+ Map<TableName, Set<String>> existColumnFamilyMap = new HashMap<>();
+ try (Connection localConn =
ConnectionFactory.createConnection(ctx.getLocalConfiguration());
+ Admin localAdmin = localConn.getAdmin()) {
+ for (List<Entry> oldEntries : oldEntryList) {
+ List<Entry> entries = new ArrayList<>();
+ for (Entry e : oldEntries) {
+ TableName tableName = e.getKey().getTableName();
+ if (!existColumnFamilyMap.containsKey(tableName)) {
+ try {
+ Set<String> cfs =
localAdmin.getDescriptor(tableName).getColumnFamilyNames().stream()
+ .map(Bytes::toString).collect(Collectors.toSet());
+ existColumnFamilyMap.put(tableName, cfs);
+ } catch (Exception ex) {
+ LOG.warn("Exception getting cf names for local table {}",
tableName, ex);
+ // if catch any exception, we are not sure about table's
description,
+ // so replicate raw entry
+ entries.add(e);
+ continue;
+ }
+ }
+
+ Set<String> existColumnFamilies =
existColumnFamilyMap.get(tableName);
+ Set<String> missingCFs = new HashSet<>();
+ WALEdit walEdit = new WALEdit();
+ walEdit.getCells().addAll(e.getEdit().getCells());
+ WALUtil.filterCells(walEdit, cell -> {
+ String cf = Bytes.toString(CellUtil.cloneFamily(cell));
+ if (existColumnFamilies.contains(cf)) {
+ return cell;
+ } else {
+ missingCFs.add(cf);
+ return null;
+ }
+ });
+ if (!walEdit.isEmpty()) {
+ Entry newEntry = new Entry(e.getKey(), walEdit);
+ entries.add(newEntry);
+ }
+
+ if (!missingCFs.isEmpty()) {
+ // Would potentially be better to retry in one of the outer loops
+ // and add a table filter there; but that would break the
encapsulation,
+ // so we're doing the filtering here.
+ LOG.warn(
+ "Missing column family detected at sink, local column family
also does not exist,"
Review comment:
Nit: add whitespace before "filtering". And seems miss a ' for
columnfamily {}?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]