wchevreuil commented on code in PR #5121:
URL: https://github.com/apache/hbase/pull/5121#discussion_r1141960626


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java:
##########
@@ -768,13 +782,50 @@ private static void copyHFileHalf(Configuration conf, 
Path inFile, Path outFile,
         .withChecksumType(StoreUtils.getChecksumType(conf))
         
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
         
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
-        .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
-      halfWriter = new StoreFileWriter.Builder(conf, cacheConf, 
fs).withFilePath(outFile)
-        .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+        .build();
+
       HFileScanner scanner = halfReader.getScanner(false, false, false);
       scanner.seekTo();
       do {
-        halfWriter.append(scanner.getCell());
+        final Cell cell = scanner.getCell();
+        if (null != halfWriter) {
+          halfWriter.append(cell);
+        } else {
+
+          // init halfwriter
+          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, 
DEFAULT_LOCALITY_SENSITIVE)) {
+            byte[] rowKey = CellUtil.cloneRow(cell);
+            HRegionLocation hRegionLocation = 
FutureUtils.get(loc.getRegionLocation(rowKey));
+            InetSocketAddress[] favoredNodes = null;
+            if (null == hRegionLocation) {
+              LOG.trace("Failed get of location, use default writer {}", 
Bytes.toString(rowKey));

Review Comment:
   What does "default writer" mean here? A writer that doesn't take region 
location into account? Also, my impression here is that this is an unexpected 
behaviour. If that's the case, let's raise the log level to info or warn.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java:
##########
@@ -768,13 +782,50 @@ private static void copyHFileHalf(Configuration conf, 
Path inFile, Path outFile,
         .withChecksumType(StoreUtils.getChecksumType(conf))
         
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
         
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
-        .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
-      halfWriter = new StoreFileWriter.Builder(conf, cacheConf, 
fs).withFilePath(outFile)
-        .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+        .build();

Review Comment:
   Why removed creation time?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java:
##########
@@ -768,13 +782,50 @@ private static void copyHFileHalf(Configuration conf, 
Path inFile, Path outFile,
         .withChecksumType(StoreUtils.getChecksumType(conf))
         
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
         
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
-        .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
-      halfWriter = new StoreFileWriter.Builder(conf, cacheConf, 
fs).withFilePath(outFile)
-        .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+        .build();
+
       HFileScanner scanner = halfReader.getScanner(false, false, false);
       scanner.seekTo();
       do {
-        halfWriter.append(scanner.getCell());
+        final Cell cell = scanner.getCell();
+        if (null != halfWriter) {
+          halfWriter.append(cell);
+        } else {
+
+          // init halfwriter
+          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, 
DEFAULT_LOCALITY_SENSITIVE)) {

Review Comment:
   Do we really need to do this on every cell read? Sounds like a waste for me.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java:
##########
@@ -768,13 +782,50 @@ private static void copyHFileHalf(Configuration conf, 
Path inFile, Path outFile,
         .withChecksumType(StoreUtils.getChecksumType(conf))
         
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
         
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
-        .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
-      halfWriter = new StoreFileWriter.Builder(conf, cacheConf, 
fs).withFilePath(outFile)
-        .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+        .build();
+
       HFileScanner scanner = halfReader.getScanner(false, false, false);
       scanner.seekTo();
       do {
-        halfWriter.append(scanner.getCell());
+        final Cell cell = scanner.getCell();
+        if (null != halfWriter) {
+          halfWriter.append(cell);
+        } else {
+
+          // init halfwriter
+          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, 
DEFAULT_LOCALITY_SENSITIVE)) {
+            byte[] rowKey = CellUtil.cloneRow(cell);
+            HRegionLocation hRegionLocation = 
FutureUtils.get(loc.getRegionLocation(rowKey));
+            InetSocketAddress[] favoredNodes = null;
+            if (null == hRegionLocation) {
+              LOG.trace("Failed get of location, use default writer {}", 
Bytes.toString(rowKey));
+            } else {
+              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
+              InetSocketAddress initialIsa =
+                new InetSocketAddress(hRegionLocation.getHostname(), 
hRegionLocation.getPort());
+              if (initialIsa.isUnresolved()) {
+                LOG.trace("Failed resolve address {}, use default writer",

Review Comment:
   Same as previous comment.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java:
##########
@@ -768,13 +782,50 @@ private static void copyHFileHalf(Configuration conf, 
Path inFile, Path outFile,
         .withChecksumType(StoreUtils.getChecksumType(conf))
         
.withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
         
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
-        .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
-      halfWriter = new StoreFileWriter.Builder(conf, cacheConf, 
fs).withFilePath(outFile)
-        .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+        .build();
+
       HFileScanner scanner = halfReader.getScanner(false, false, false);
       scanner.seekTo();
       do {
-        halfWriter.append(scanner.getCell());
+        final Cell cell = scanner.getCell();
+        if (null != halfWriter) {
+          halfWriter.append(cell);
+        } else {
+
+          // init halfwriter
+          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, 
DEFAULT_LOCALITY_SENSITIVE)) {
+            byte[] rowKey = CellUtil.cloneRow(cell);
+            HRegionLocation hRegionLocation = 
FutureUtils.get(loc.getRegionLocation(rowKey));
+            InetSocketAddress[] favoredNodes = null;
+            if (null == hRegionLocation) {
+              LOG.trace("Failed get of location, use default writer {}", 
Bytes.toString(rowKey));
+            } else {
+              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
+              InetSocketAddress initialIsa =
+                new InetSocketAddress(hRegionLocation.getHostname(), 
hRegionLocation.getPort());
+              if (initialIsa.isUnresolved()) {
+                LOG.trace("Failed resolve address {}, use default writer",
+                  hRegionLocation.getHostnamePort());
+              } else {
+                LOG.debug("Use favored nodes writer: {}", 
initialIsa.getHostString());
+                favoredNodes = new InetSocketAddress[] { initialIsa };
+              }
+            }
+            if (null == favoredNodes) {
+              halfWriter = new StoreFileWriter.Builder(conf, cacheConf, 
fs).withFilePath(outFile)
+                
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+            } else {
+              halfWriter = new StoreFileWriter.Builder(conf, cacheConf, 
fs).withFilePath(outFile)
+                .withBloomType(bloomFilterType).withFileContext(hFileContext)
+                .withFavoredNodes(favoredNodes).build();
+            }

Review Comment:
   Why these extra if/else block? Just create the writer accordingly in each of 
the above if/else block branches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to