shaofengshi closed pull request #426: KYLIN-1575 Timeline-consistent High
Available Reads for metadata table
URL: https://github.com/apache/kylin/pull/426
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b63062e31a..615ebccdec 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1929,4 +1929,13 @@ public int getJdbcResourceStoreMaxCellSize() {
public String getJdbcSourceAdaptor() {
return getOptional("kylin.source.jdbc.adaptor");
}
+
+ public int getHBaseResourceStoreReplication() {
+ try {
+ return
Integer.parseInt(getOptional("kylin.metadata.hbase-replication", "1"));
+ } catch (NumberFormatException e) {
+ logger.error("Invalid number of
'kylin.metadata.hbase-replication'", e);
+ return 1;
+ }
+ }
}
diff --git a/core-common/src/main/resources/kylin-defaults.properties
b/core-common/src/main/resources/kylin-defaults.properties
index 6f2db9a903..dd667b4791 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -23,6 +23,9 @@ kylin.metadata.url=kylin_metadata@hbase
# metadata cache sync retry times
kylin.metadata.sync-retries=3
+# metadata replica in hbase region servers
+kylin.metadata.hbase-replication=1
+
# Working folder in HDFS, better be qualified absolute path, make sure user
has the right permission to this directory
kylin.env.hdfs-working-dir=/kylin
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index 53e8a686f7..d34d5c7c22 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -300,11 +300,12 @@ public static void createHTableIfNeeded(Connection conn,
String table, String...
TableName tableName = TableName.valueOf(table);
DistributedLock lock = null;
String lockPath = getLockPath(table);
-
+ int replication =
KylinConfig.getInstanceFromEnv().getHBaseResourceStoreReplication();
try {
if (tableExists(conn, table)) {
logger.debug("HTable '" + table + "' already exists");
- Set<String> existingFamilies =
getFamilyNames(admin.getTableDescriptor(tableName));
+ HTableDescriptor hTableDescriptor =
admin.getTableDescriptor(tableName);
+ Set<String> existingFamilies =
getFamilyNames(hTableDescriptor);
boolean wait = false;
for (String family : families) {
if (existingFamilies.contains(family) == false) {
@@ -321,6 +322,14 @@ public static void createHTableIfNeeded(Connection conn,
String table, String...
logger.warn("", e);
}
}
+
+ if (hTableDescriptor.getRegionReplication() != replication) {
+ logger.debug("Update HTable '" + table + "' replication to
{}", replication);
+ hTableDescriptor.setRegionReplication(replication);
+ admin.disableTable(tableName);
+ admin.modifyTable(tableName, hTableDescriptor);
+ admin.enableTable(tableName);
+ }
return;
}
@@ -344,6 +353,7 @@ public static void createHTableIfNeeded(Connection conn,
String table, String...
}
}
+ desc.setRegionReplication(replication);
admin.createTable(desc);
logger.debug("HTable '" + table + "' created");
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 14c5ea7cf0..cb80aa6aa6 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@@ -183,6 +184,7 @@ private void visitFolder(String folderPath, VisitFilter
filter, boolean loadCont
Table table = getConnection().getTable(TableName.valueOf(tableName));
Scan scan = new Scan(startRow, endRow);
+ scan.setConsistency(Consistency.TIMELINE);
scan.addColumn(B_FAMILY, B_COLUMN_TS);
if (loadContent) {
scan.addColumn(B_FAMILY, B_COLUMN);
@@ -199,6 +201,9 @@ private void visitFolder(String folderPath, VisitFilter
filter, boolean loadCont
for (Result r : scanner) {
String path = Bytes.toString(r.getRow());
assert path.startsWith(lookForPrefix);
+ if (r.isStale()) {
+ logger.warn("Kylin reads stale data at: {}", path);
+ }
int cut = path.indexOf('/', folderPrefix.length());
String directChild = cut < 0 ? path : path.substring(0, cut);
visitor.visit(directChild, path, r);
@@ -395,6 +400,7 @@ private Result internalGetFromHTable(Table table, String
path, boolean fetchCont
byte[] rowkey = Bytes.toBytes(path);
Get get = new Get(rowkey);
+ get.setConsistency(Consistency.TIMELINE);
if (!fetchContent && !fetchTimestamp) {
get.setCheckExistenceOnly(true);
@@ -406,6 +412,9 @@ private Result internalGetFromHTable(Table table, String
path, boolean fetchCont
}
Result result = table.get(get);
+ if (result.isStale()) {
+ logger.warn("Kylin reads stale data at: {}", path);
+ }
boolean exists = result != null && (!result.isEmpty() ||
(result.getExists() != null && result.getExists()));
return exists ? result : null;
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services