This is an automated email from the ASF dual-hosted git repository. vaughn pushed a commit to branch fix_zyxxoo in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-toolchain.git
commit 6e05f9dd7cf7108445a6abe569545601d83eb375 Author: vaughn.zhang <[email protected]> AuthorDate: Mon Jan 29 17:01:32 2024 +0800 fix: concurrency issue causing file overwrite due to identical filenames --- .../loader/direct/loader/HBaseDirectLoader.java | 65 +++++++++++++++++++++- 1 file changed, 63 insertions(+), 2 deletions(-) diff --git a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java index fd1a0236..3526b5cb 100644 --- a/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java +++ b/hugegraph-loader/src/main/java/org/apache/hugegraph/loader/direct/loader/HBaseDirectLoader.java @@ -18,10 +18,13 @@ package org.apache.hugegraph.loader.direct.loader; import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.SecureRandom; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; @@ -59,8 +62,66 @@ public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable, KeyV private SinkToHBase sinkToHBase; private LoadDistributeMetrics loadDistributeMetrics; + private static final int RANDOM_VALUE1; + private static final short RANDOM_VALUE2; + private static final AtomicInteger NEXT_COUNTER; + public static final Logger LOG = Log.logger(HBaseDirectLoader.class); + static { + try { + SecureRandom secureRandom = new SecureRandom(); + RANDOM_VALUE1 = secureRandom.nextInt(0x01000000); + RANDOM_VALUE2 = (short) secureRandom.nextInt(0x00008000); + NEXT_COUNTER = new AtomicInteger(new SecureRandom().nextInt()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static byte int3(final int x) { + return (byte) (x >> 24); + } + + private static byte int2(final int x) { + return (byte) (x >> 16); + } + + private static byte int1(final int x) { + return (byte) (x >> 8); + } + + private static byte int0(final int x) { + return (byte) (x); + } + + private static byte short1(final short x) { + return (byte) (x >> 8); + } + + private static byte short0(final short x) { + return (byte) (x); + } + + public static String fileID() { + long timeStamp = System.currentTimeMillis() / 1000; + ByteBuffer byteBuffer = ByteBuffer.allocate(12); + byteBuffer.put(int3((int) timeStamp)); + byteBuffer.put(int2((int) timeStamp)); + byteBuffer.put(int1((int) timeStamp)); + byteBuffer.put(int0((int) timeStamp)); + byteBuffer.put(int2(RANDOM_VALUE1)); + byteBuffer.put(int1(RANDOM_VALUE1)); + byteBuffer.put(int0(RANDOM_VALUE1)); + byteBuffer.put(short1(RANDOM_VALUE2)); + byteBuffer.put(short0(RANDOM_VALUE2)); + byteBuffer.put(int2(NEXT_COUNTER.incrementAndGet())); + byteBuffer.put(int1(NEXT_COUNTER.incrementAndGet())); + byteBuffer.put(int0(NEXT_COUNTER.incrementAndGet())); + + return Bytes.toHex(byteBuffer.array()); + } + public HBaseDirectLoader(LoadOptions loadOptions, InputStruct struct, LoadDistributeMetrics loadDistributeMetrics) { @@ -144,8 +205,8 @@ public class HBaseDirectLoader extends DirectLoader<ImmutableBytesWritable, KeyV public String getHFilePath(Configuration conf) throws IOException { FileSystem fs = FileSystem.get(conf); - long timeStr = System.currentTimeMillis(); - String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + "/" + timeStr + "/"; + String fileID = fileID(); + String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + "/" + fileID + "/"; Path hfileGenPath = new Path(pathStr); if (fs.exists(hfileGenPath)) { LOG.info("\n Delete the path where the hfile is generated,path {} ", pathStr);
