This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 52c6d77274f HDFS-14478. Add libhdfs APIs for openFile (#4166)
52c6d77274f is described below
commit 52c6d77274fcf0a408914fb88b4402543a687977
Author: Steve Loughran <[email protected]>
AuthorDate: Wed Apr 13 14:15:27 2022 +0100
HDFS-14478. Add libhdfs APIs for openFile (#4166)
Contributed by Sahil Takiar
Change-Id: I2f9e82a69010df7496704754515b031f2a907167
---
.../main/native/libhdfs-tests/test_libhdfs_ops.c | 62 +++
.../src/main/native/libhdfs/hdfs.c | 500 ++++++++++++++++++++-
.../src/main/native/libhdfs/include/hdfs/hdfs.h | 135 ++++++
.../src/main/native/libhdfs/jclasses.c | 4 +
.../src/main/native/libhdfs/jclasses.h | 7 +
.../src/main/native/libhdfspp/tests/hdfs_shim.c | 59 +++
.../libhdfspp/tests/libhdfs_wrapper_defines.h | 17 +
.../libhdfspp/tests/libhdfs_wrapper_undefs.h | 17 +
.../libhdfspp/tests/libhdfspp_wrapper_defines.h | 17 +
9 files changed, 807 insertions(+), 11 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
index 23fa2e51128..b1e64c642ed 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
+++
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
@@ -454,6 +454,68 @@ int main(int argc, char **argv) {
hdfsCloseFile(lfs, localFile);
}
+
+ {
+ // HDFS Open File Builder tests
+
+ exists = hdfsExists(fs, readPath);
+
+ if (exists) {
+ fprintf(stderr, "Failed to validate existence of %s\n", readPath);
+ shutdown_and_exit(cl, -1);
+ }
+
+ hdfsOpenFileBuilder *builder;
+ builder = hdfsOpenFileBuilderAlloc(fs, readPath);
+ hdfsOpenFileBuilderOpt(builder, "hello", "world");
+
+ hdfsOpenFileFuture *future;
+ future = hdfsOpenFileBuilderBuild(builder);
+
+ readFile = hdfsOpenFileFutureGet(future);
+ if (!hdfsOpenFileFutureCancel(future, 0)) {
+ fprintf(stderr, "Cancel on a completed Future should return
false");
+ shutdown_and_exit(cl, -1);
+ }
+ hdfsOpenFileFutureFree(future);
+
+ memset(buffer, 0, sizeof(buffer));
+ num_read_bytes = hdfsRead(fs, readFile, (void *) buffer,
+ sizeof(buffer));
+ if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+ fprintf(stderr,
+ "Failed to read. Expected %s but got %s (%d bytes)\n",
+ fileContents, buffer, num_read_bytes);
+ shutdown_and_exit(cl, -1);
+ }
+ hdfsCloseFile(fs, readFile);
+
+ builder = hdfsOpenFileBuilderAlloc(fs, readPath);
+ hdfsOpenFileBuilderOpt(builder, "hello", "world");
+
+ future = hdfsOpenFileBuilderBuild(builder);
+
+ readFile = hdfsOpenFileFutureGetWithTimeout(future, 1, jDays);
+ if (!hdfsOpenFileFutureCancel(future, 0)) {
+ fprintf(stderr, "Cancel on a completed Future should return "
+ "false");
+ shutdown_and_exit(cl, -1);
+ }
+ hdfsOpenFileFutureFree(future);
+
+ memset(buffer, 0, sizeof(buffer));
+ num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
+ sizeof(buffer));
+ if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+ fprintf(stderr, "Failed to read. Expected %s but got "
+ "%s (%d bytes)\n", fileContents, buffer,
+ num_read_bytes);
+ shutdown_and_exit(cl, -1);
+ }
+ memset(buffer, 0, strlen(fileContents + 1));
+ hdfsCloseFile(fs, readFile);
+ }
+
totalResult = 0;
result = 0;
{
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
index 60f2826c741..ed150925cdb 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
+++
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
@@ -38,6 +38,10 @@
#define KERBEROS_TICKET_CACHE_PATH "hadoop.security.kerberos.ticket.cache.path"
+// StreamCapability flags taken from o.a.h.fs.StreamCapabilities
+#define IS_READ_BYTE_BUFFER_CAPABILITY "in:readbytebuffer"
+#define IS_PREAD_BYTE_BUFFER_CAPABILITY "in:preadbytebuffer"
+
// Bit fields for hdfsFile_internal flags
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
#define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<1)
@@ -1075,6 +1079,27 @@ done:
return 0;
}
+/**
+ * Sets the flags of the given hdfsFile based on the capabilities of the
+ * underlying stream.
+ *
+ * @param file file->flags will be updated based on the capabilities of jFile
+ * @param jFile the underlying stream to check for capabilities
+ */
+static void setFileFlagCapabilities(hdfsFile file, jobject jFile) {
+ // Check the StreamCapabilities of jFile to see if we can do direct
+ // reads
+ if (hdfsHasStreamCapability(jFile, IS_READ_BYTE_BUFFER_CAPABILITY)) {
+ file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
+ }
+
+ // Check the StreamCapabilities of jFile to see if we can do direct
+ // preads
+ if (hdfsHasStreamCapability(jFile, IS_PREAD_BYTE_BUFFER_CAPABILITY)) {
+ file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD;
+ }
+}
+
static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
int32_t bufferSize, int16_t replication, int64_t blockSize)
{
@@ -1245,17 +1270,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char
*path, int flags,
file->flags = 0;
if ((flags & O_WRONLY) == 0) {
- // Check the StreamCapabilities of jFile to see if we can do direct
- // reads
- if (hdfsHasStreamCapability(jFile, "in:readbytebuffer")) {
- file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
- }
-
- // Check the StreamCapabilities of jFile to see if we can do direct
- // preads
- if (hdfsHasStreamCapability(jFile, "in:preadbytebuffer")) {
- file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD;
- }
+ setFileFlagCapabilities(file, jFile);
}
ret = 0;
@@ -1288,6 +1303,469 @@ hdfsFile hdfsStreamBuilderBuild(struct
hdfsStreamBuilder *bld)
return file;
}
+/**
+ * A wrapper around o.a.h.fs.FutureDataInputStreamBuilder and the file name
+ * associated with the builder.
+ */
+struct hdfsOpenFileBuilder {
+ jobject jBuilder;
+ const char *path;
+};
+
+/**
+ * A wrapper around a java.util.concurrent.Future (created by calling
+ * FutureDataInputStreamBuilder#build) and the file name associated with the
+ * builder.
+ */
+struct hdfsOpenFileFuture {
+ jobject jFuture;
+ const char *path;
+};
+
+hdfsOpenFileBuilder *hdfsOpenFileBuilderAlloc(hdfsFS fs,
+ const char *path) {
+ int ret = 0;
+ jthrowable jthr;
+ jvalue jVal;
+ jobject jFS = (jobject) fs;
+
+ jobject jPath = NULL;
+ jobject jBuilder = NULL;
+
+ JNIEnv *env = getJNIEnv();
+ if (!env) {
+ errno = EINTERNAL;
+ return NULL;
+ }
+
+ hdfsOpenFileBuilder *builder;
+ builder = calloc(1, sizeof(hdfsOpenFileBuilder));
+ if (!builder) {
+ fprintf(stderr, "hdfsOpenFileBuilderAlloc(%s): OOM when creating "
+ "hdfsOpenFileBuilder\n", path);
+ errno = ENOMEM;
+ goto done;
+ }
+ builder->path = path;
+
+ jthr = constructNewObjectOfPath(env, path, &jPath);
+ if (jthr) {
+ errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsOpenFileBuilderAlloc(%s): constructNewObjectOfPath",
+ path);
+ goto done;
+ }
+
+ jthr = invokeMethod(env, &jVal, INSTANCE, jFS, JC_FILE_SYSTEM,
+ "openFile", JMETHOD1(JPARAM(HADOOP_PATH), JPARAM(HADOOP_FDISB)),
+ jPath);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsOpenFileBuilderAlloc(%s): %s#openFile(Path) failed",
+ HADOOP_FS, path);
+ goto done;
+ }
+ jBuilder = jVal.l;
+
+ builder->jBuilder = (*env)->NewGlobalRef(env, jBuilder);
+ if (!builder->jBuilder) {
+ printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+ "hdfsOpenFileBuilderAlloc(%s): NewGlobalRef(%s) failed", path,
+ HADOOP_FDISB);
+ ret = EINVAL;
+ goto done;
+ }
+
+done:
+ destroyLocalReference(env, jPath);
+ destroyLocalReference(env, jBuilder);
+ if (ret) {
+ if (builder) {
+ if (builder->jBuilder) {
+ (*env)->DeleteGlobalRef(env, builder->jBuilder);
+ }
+ free(builder);
+ }
+ errno = ret;
+ return NULL;
+ }
+ return builder;
+}
+
+/**
+ * Used internally by hdfsOpenFileBuilderWithOption to switch between
+ * FSBuilder#must and #opt.
+ */
+typedef enum { must, opt } openFileBuilderOptionType;
+
+/**
+ * Shared implementation of hdfsOpenFileBuilderMust and hdfsOpenFileBuilderOpt
+ * that switches between each method depending on the value of
+ * openFileBuilderOptionType.
+ */
+static hdfsOpenFileBuilder *hdfsOpenFileBuilderWithOption(
+ hdfsOpenFileBuilder *builder, const char *key,
+ const char *value, openFileBuilderOptionType optionType) {
+ int ret = 0;
+ jthrowable jthr;
+ jvalue jVal;
+ jobject localJBuilder = NULL;
+ jobject globalJBuilder;
+ jstring jKeyString = NULL;
+ jstring jValueString = NULL;
+
+ // If the builder was not previously created by a prior call to
+ // hdfsOpenFileBuilderAlloc then exit
+ if (builder == NULL || builder->jBuilder == NULL) {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ JNIEnv *env = getJNIEnv();
+ if (!env) {
+ errno = EINTERNAL;
+ return NULL;
+ }
+ jthr = newJavaStr(env, key, &jKeyString);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsOpenFileBuilderWithOption(%s): newJavaStr(%s)",
+ builder->path, key);
+ goto done;
+ }
+ jthr = newJavaStr(env, value, &jValueString);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsOpenFileBuilderWithOption(%s): newJavaStr(%s)",
+ builder->path, value);
+ goto done;
+ }
+
+ const char *optionTypeMethodName;
+ switch (optionType) {
+ case must:
+ optionTypeMethodName = "must";
+ break;
+ case opt:
+ optionTypeMethodName = "opt";
+ break;
+ default:
+ ret = EINTERNAL;
+ goto done;
+ }
+
+ jthr = invokeMethod(env, &jVal, INSTANCE, builder->jBuilder,
+ JC_FUTURE_DATA_IS_BUILDER, optionTypeMethodName,
+ JMETHOD2(JPARAM(JAVA_STRING), JPARAM(JAVA_STRING),
+ JPARAM(HADOOP_FS_BLDR)), jKeyString,
+ jValueString);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsOpenFileBuilderWithOption(%s): %s#%s(%s, %s) failed",
+ builder->path, HADOOP_FS_BLDR, optionTypeMethodName, key,
+ value);
+ goto done;
+ }
+
+ localJBuilder = jVal.l;
+ globalJBuilder = (*env)->NewGlobalRef(env, localJBuilder);
+ if (!globalJBuilder) {
+ printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+ "hdfsOpenFileBuilderWithOption(%s): NewGlobalRef(%s) failed",
+ builder->path, HADOOP_FDISB);
+ ret = EINVAL;
+ goto done;
+ }
+ (*env)->DeleteGlobalRef(env, builder->jBuilder);
+ builder->jBuilder = globalJBuilder;
+
+done:
+ destroyLocalReference(env, jKeyString);
+ destroyLocalReference(env, jValueString);
+ destroyLocalReference(env, localJBuilder);
+ if (ret) {
+ errno = ret;
+ return NULL;
+ }
+ return builder;
+}
+
+hdfsOpenFileBuilder *hdfsOpenFileBuilderMust(hdfsOpenFileBuilder *builder,
+ const char *key, const char *value) {
+ openFileBuilderOptionType optionType;
+ optionType = must;
+ return hdfsOpenFileBuilderWithOption(builder, key, value, optionType);
+}
+
+hdfsOpenFileBuilder *hdfsOpenFileBuilderOpt(hdfsOpenFileBuilder *builder,
+ const char *key, const char *value) {
+ openFileBuilderOptionType optionType;
+ optionType = opt;
+ return hdfsOpenFileBuilderWithOption(builder, key, value, optionType);
+}
+
+hdfsOpenFileFuture *hdfsOpenFileBuilderBuild(hdfsOpenFileBuilder *builder) {
+ int ret = 0;
+ jthrowable jthr;
+ jvalue jVal;
+
+ jobject jFuture = NULL;
+
+ // If the builder was not previously created by a prior call to
+ // hdfsOpenFileBuilderAlloc then exit
+ if (builder == NULL || builder->jBuilder == NULL) {
+ ret = EINVAL;
+ return NULL;
+ }
+
+ JNIEnv *env = getJNIEnv();
+ if (!env) {
+ errno = EINTERNAL;
+ return NULL;
+ }
+
+ hdfsOpenFileFuture *future;
+ future = calloc(1, sizeof(hdfsOpenFileFuture));
+ if (!future) {
+ fprintf(stderr, "hdfsOpenFileBuilderBuild: OOM when creating "
+ "hdfsOpenFileFuture\n");
+ errno = ENOMEM;
+ goto done;
+ }
+ future->path = builder->path;
+
+ jthr = invokeMethod(env, &jVal, INSTANCE, builder->jBuilder,
+ JC_FUTURE_DATA_IS_BUILDER, "build",
+ JMETHOD1("", JPARAM(JAVA_CFUTURE)));
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsOpenFileBuilderBuild(%s): %s#build() failed",
+ builder->path, HADOOP_FDISB);
+ goto done;
+ }
+ jFuture = jVal.l;
+
+ future->jFuture = (*env)->NewGlobalRef(env, jFuture);
+ if (!future->jFuture) {
+ printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+ "hdfsOpenFileBuilderBuild(%s): NewGlobalRef(%s) failed",
+ builder->path, JAVA_CFUTURE);
+ ret = EINVAL;
+ goto done;
+ }
+
+done:
+ destroyLocalReference(env, jFuture);
+ if (ret) {
+ if (future) {
+ if (future->jFuture) {
+ (*env)->DeleteGlobalRef(env, future->jFuture);
+ }
+ free(future);
+ }
+ hdfsOpenFileBuilderFree(builder);
+ errno = ret;
+ return NULL;
+ }
+ hdfsOpenFileBuilderFree(builder);
+ return future;
+}
+
+void hdfsOpenFileBuilderFree(hdfsOpenFileBuilder *builder) {
+ JNIEnv *env;
+ env = getJNIEnv();
+ if (!env) {
+ return;
+ }
+ if (builder->jBuilder) {
+ (*env)->DeleteGlobalRef(env, builder->jBuilder);
+ builder->jBuilder = NULL;
+ }
+ free(builder);
+}
+
+/**
+ * Shared implementation of hdfsOpenFileFutureGet and
+ * hdfsOpenFileFutureGetWithTimeout. If a timeout is specified, calls
+ * Future#get() otherwise it calls Future#get(long, TimeUnit).
+ */
+static hdfsFile fileFutureGetWithTimeout(hdfsOpenFileFuture *future,
+ int64_t timeout, jobject jTimeUnit) {
+ int ret = 0;
+ jthrowable jthr;
+ jvalue jVal;
+
+ hdfsFile file = NULL;
+ jobject jFile = NULL;
+
+ JNIEnv *env = getJNIEnv();
+ if (!env) {
+ ret = EINTERNAL;
+ return NULL;
+ }
+
+ if (!jTimeUnit) {
+ jthr = invokeMethod(env, &jVal, INSTANCE, future->jFuture,
+ JC_CFUTURE, "get", JMETHOD1("", JPARAM(JAVA_OBJECT)));
+ } else {
+ jthr = invokeMethod(env, &jVal, INSTANCE, future->jFuture,
+ JC_CFUTURE, "get", JMETHOD2("J",
+ JPARAM(JAVA_TIMEUNIT), JPARAM(JAVA_OBJECT)), timeout,
+ jTimeUnit);
+ }
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsOpenFileFutureGet(%s): %s#get failed", future->path,
+ JAVA_CFUTURE);
+ goto done;
+ }
+
+ file = calloc(1, sizeof(struct hdfsFile_internal));
+ if (!file) {
+ fprintf(stderr, "hdfsOpenFileFutureGet(%s): OOM when creating "
+ "hdfsFile\n", future->path);
+ ret = ENOMEM;
+ goto done;
+ }
+ jFile = jVal.l;
+ file->file = (*env)->NewGlobalRef(env, jFile);
+ if (!file->file) {
+ ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+ "hdfsOpenFileFutureGet(%s): NewGlobalRef(jFile) failed",
+ future->path);
+ goto done;
+ }
+
+ file->type = HDFS_STREAM_INPUT;
+ file->flags = 0;
+
+ setFileFlagCapabilities(file, jFile);
+
+done:
+ destroyLocalReference(env, jTimeUnit);
+ destroyLocalReference(env, jFile);
+ if (ret) {
+ if (file) {
+ if (file->file) {
+ (*env)->DeleteGlobalRef(env, file->file);
+ }
+ free(file);
+ }
+ errno = ret;
+ return NULL;
+ }
+ return file;
+}
+
+hdfsFile hdfsOpenFileFutureGet(hdfsOpenFileFuture *future) {
+ return fileFutureGetWithTimeout(future, -1, NULL);
+}
+
+hdfsFile hdfsOpenFileFutureGetWithTimeout(hdfsOpenFileFuture *future,
+ int64_t timeout, javaConcurrentTimeUnit timeUnit) {
+ int ret = 0;
+ jthrowable jthr;
+ jobject jTimeUnit = NULL;
+
+ JNIEnv *env = getJNIEnv();
+ if (!env) {
+ ret = EINTERNAL;
+ return NULL;
+ }
+
+ const char *timeUnitEnumName;
+ switch (timeUnit) {
+ case jNanoseconds:
+ timeUnitEnumName = "NANOSECONDS";
+ break;
+ case jMicroseconds:
+ timeUnitEnumName = "MICROSECONDS";
+ break;
+ case jMilliseconds:
+ timeUnitEnumName = "MILLISECONDS";
+ break;
+ case jSeconds:
+ timeUnitEnumName = "SECONDS";
+ break;
+ case jMinutes:
+ timeUnitEnumName = "MINUTES";
+ break;
+ case jHours:
+ timeUnitEnumName = "HOURS";
+ break;
+ case jDays:
+ timeUnitEnumName = "DAYS";
+ break;
+ default:
+ ret = EINTERNAL;
+ goto done;
+ }
+
+ jthr = fetchEnumInstance(env, JAVA_TIMEUNIT, timeUnitEnumName, &jTimeUnit);
+
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsOpenFileFutureGet(%s): %s#get failed", future->path,
+ JAVA_CFUTURE);
+ goto done;
+ }
+ return fileFutureGetWithTimeout(future, timeout, jTimeUnit);
+
+done:
+ if (ret) {
+ errno = ret;
+ }
+ return NULL;
+}
+
+int hdfsOpenFileFutureCancel(hdfsOpenFileFuture *future,
+ int mayInterruptIfRunning) {
+ int ret = 0;
+ jthrowable jthr;
+ jvalue jVal;
+
+ jboolean jMayInterruptIfRunning;
+
+ JNIEnv *env = getJNIEnv();
+ if (!env) {
+ ret = EINTERNAL;
+ return -1;
+ }
+
+ jMayInterruptIfRunning = mayInterruptIfRunning ? JNI_TRUE : JNI_FALSE;
+ jthr = invokeMethod(env, &jVal, INSTANCE, future->jFuture, JC_CFUTURE,
+ "cancel", JMETHOD1("Z", "Z"), jMayInterruptIfRunning);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsOpenFileFutureCancel(%s): %s#cancel failed", future->path,
+ JAVA_CFUTURE);
+ goto done;
+ }
+
+done:
+ if (ret) {
+ errno = ret;
+ return -1;
+ }
+ if (!jVal.z) {
+ return -1;
+ }
+ return 0;
+}
+
+void hdfsOpenFileFutureFree(hdfsOpenFileFuture *future) {
+ JNIEnv *env;
+ env = getJNIEnv();
+ if (!env) {
+ return;
+ }
+ if (future->jFuture) {
+ (*env)->DeleteGlobalRef(env, future->jFuture);
+ future->jFuture = NULL;
+ }
+ free(future);
+}
+
int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength)
{
jobject jFS = (jobject)fs;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h
index e58a6232d20..eba50ff6eb2 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h
+++
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h
@@ -82,6 +82,29 @@ extern "C" {
} tObjectKind;
struct hdfsStreamBuilder;
+ /**
+ * The C reflection of the enum values from java.util.concurrent.TimeUnit .
+ */
+ typedef enum javaConcurrentTimeUnit {
+ jNanoseconds,
+ jMicroseconds,
+ jMilliseconds,
+ jSeconds,
+ jMinutes,
+ jHours,
+ jDays,
+ } javaConcurrentTimeUnit;
+
+ /**
+ * The C reflection of java.util.concurrent.Future specifically used for
+ * opening HDFS files asynchronously.
+ */
+ typedef struct hdfsOpenFileFuture hdfsOpenFileFuture;
+
+ /**
+ * The C reflection of o.a.h.fs.FutureDataInputStreamBuilder .
+ */
+ typedef struct hdfsOpenFileBuilder hdfsOpenFileBuilder;
/**
* The C reflection of org.apache.org.hadoop.FileSystem .
@@ -429,6 +452,118 @@ extern "C" {
hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
int bufferSize, short replication, tSize blocksize);
+ /**
+ * hdfsOpenFileBuilderAlloc - Allocate a HDFS open file builder.
+ *
+ * @param fs The configured filesystem handle.
+ * @param path The full path to the file.
+ * @return Returns the hdfsOpenFileBuilder, or NULL on error.
+ */
+ LIBHDFS_EXTERNAL
+ hdfsOpenFileBuilder *hdfsOpenFileBuilderAlloc(hdfsFS fs,
+ const char *path);
+
+ /**
+ * hdfsOpenFileBuilderMust - Specifies a mandatory parameter for the open
+ * file builder. While the underlying FsBuilder supports various various
+ * types for the value (boolean, int, float, double), currently only
+ * strings are supported.
+ *
+ * @param builder The open file builder to set the config for.
+ * @param key The config key
+ * @param value The config value
+ * @return Returns the hdfsOpenFileBuilder, or NULL on error.
+ */
+ LIBHDFS_EXTERNAL
+ hdfsOpenFileBuilder *hdfsOpenFileBuilderMust(hdfsOpenFileBuilder *builder,
+ const char *key, const char *value);
+
+ /**
+ * hdfsOpenFileBuilderOpt - Specifies an optional parameter for the open
+ * file builder. While the underlying FsBuilder supports various various
+ * types for the value (boolean, int, float, double), currently only
+ * strings are supported.
+ *
+ * @param builder The open file builder to set the config for.
+ * @param key The config key
+ * @param value The config value
+ * @return Returns the hdfsOpenFileBuilder, or NULL on error.
+ */
+ LIBHDFS_EXTERNAL
+ hdfsOpenFileBuilder *hdfsOpenFileBuilderOpt(hdfsOpenFileBuilder *builder,
+ const char *key, const char *value);
+
+ /**
+ * hdfsOpenFileBuilderBuild - Builds the open file builder and returns a
+ * hdfsOpenFileFuture which tracks the asynchronous call to open the
+ * specified file.
+ *
+ * @param builder The open file builder to build.
+ * @return Returns the hdfsOpenFileFuture, or NULL on error.
+ */
+ LIBHDFS_EXTERNAL
+ hdfsOpenFileFuture *hdfsOpenFileBuilderBuild(hdfsOpenFileBuilder *builder);
+
+ /**
+ * hdfsOpenFileBuilderFree - Free a HDFS open file builder.
+ *
+ * It is normally not necessary to call this function since
+ * hdfsOpenFileBuilderBuild frees the builder.
+ *
+ * @param builder The hdfsOpenFileBuilder to free.
+ */
+ LIBHDFS_EXTERNAL
+ void hdfsOpenFileBuilderFree(hdfsOpenFileBuilder *builder);
+
+ /**
+ * hdfsOpenFileFutureGet - Call Future#get() on the underlying Java Future
+ * object. A call to #get() will block until the asynchronous operation has
+ * completed. In this case, until the open file call has completed. This
+ * method blocks indefinitely until blocking call completes.
+ *
+ * @param future The hdfsOpenFileFuture to call #get on
+ * @return Returns the opened hdfsFile, or NULL on error.
+ */
+ LIBHDFS_EXTERNAL
+ hdfsFile hdfsOpenFileFutureGet(hdfsOpenFileFuture *future);
+
+ /**
+ * hdfsOpenFileFutureGetWithTimeout - Call Future#get(long, TimeUnit) on
+ * the underlying Java Future object. A call to #get(long, TimeUnit) will
+ * block until the asynchronous operation has completed (in this case,
+ * until the open file call has completed) or the specified timeout has
+ * been reached.
+ *
+ * @param future The hdfsOpenFileFuture to call #get on
+ * @return Returns the opened hdfsFile, or NULL on error or if the timeout
+ * has been reached.
+ */
+ LIBHDFS_EXTERNAL
+ hdfsFile hdfsOpenFileFutureGetWithTimeout(hdfsOpenFileFuture *future,
+ int64_t timeout, javaConcurrentTimeUnit timeUnit);
+
+ /**
+ * hdfsOpenFileFutureCancel - Call Future#cancel(boolean) on the
+ * underlying Java Future object. The value of mayInterruptedIfRunning
+ * controls whether the Java thread running the Future should be
+ * interrupted or not.
+ *
+ * @param future The hdfsOpenFileFuture to call #cancel on
+ * @param mayInterruptIfRunning if true, interrupts the running thread
+ * @return Returns 0 if the thread was successfully cancelled, else -1
+ */
+ LIBHDFS_EXTERNAL
+ int hdfsOpenFileFutureCancel(hdfsOpenFileFuture *future,
+ int mayInterruptIfRunning);
+
+ /**
+ * hdfsOpenFileFutureFree - Free a HDFS open file future.
+ *
+ * @param hdfsOpenFileFuture The hdfsOpenFileFuture to free.
+ */
+ LIBHDFS_EXTERNAL
+ void hdfsOpenFileFutureFree(hdfsOpenFileFuture *future);
+
/**
* hdfsStreamBuilderAlloc - Allocate an HDFS stream builder.
*
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/jclasses.c
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/jclasses.c
index cf880e91b75..9f589ac257a 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/jclasses.c
+++
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/jclasses.c
@@ -98,6 +98,8 @@ jthrowable initCachedClasses(JNIEnv* env) {
"org/apache/hadoop/hdfs/ReadStatistics";
cachedJavaClasses[JC_HDFS_DATA_INPUT_STREAM].className =
"org/apache/hadoop/hdfs/client/HdfsDataInputStream";
+ cachedJavaClasses[JC_FUTURE_DATA_IS_BUILDER].className =
+ "org/apache/hadoop/fs/FutureDataInputStreamBuilder";
cachedJavaClasses[JC_DOMAIN_SOCKET].className =
"org/apache/hadoop/net/unix/DomainSocket";
cachedJavaClasses[JC_URI].className =
@@ -108,6 +110,8 @@ jthrowable initCachedClasses(JNIEnv* env) {
"java/util/EnumSet";
cachedJavaClasses[JC_EXCEPTION_UTILS].className =
"org/apache/commons/lang3/exception/ExceptionUtils";
+ cachedJavaClasses[JC_CFUTURE].className =
+ "java/util/concurrent/CompletableFuture";
// Create and set the jclass objects based on the class names set above
jthrowable jthr;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/jclasses.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/jclasses.h
index 92cdd542e23..0b174e1fecc 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/jclasses.h
+++
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/jclasses.h
@@ -54,11 +54,13 @@ typedef enum {
JC_FS_PERMISSION,
JC_READ_STATISTICS,
JC_HDFS_DATA_INPUT_STREAM,
+ JC_FUTURE_DATA_IS_BUILDER,
JC_DOMAIN_SOCKET,
JC_URI,
JC_BYTE_BUFFER,
JC_ENUM_SET,
JC_EXCEPTION_UTILS,
+ JC_CFUTURE,
// A special marker enum that counts the number of cached jclasses
NUM_CACHED_CLASSES
} CachedJavaClass;
@@ -95,6 +97,8 @@ const char *getClassName(CachedJavaClass cachedJavaClass);
#define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission"
#define HADOOP_RSTAT "org/apache/hadoop/hdfs/ReadStatistics"
#define HADOOP_HDISTRM "org/apache/hadoop/hdfs/client/HdfsDataInputStream"
+#define HADOOP_FDISB "org/apache/hadoop/fs/FutureDataInputStreamBuilder"
+#define HADOOP_FS_BLDR "org/apache/hadoop/fs/FSBuilder"
#define HADOOP_RO "org/apache/hadoop/fs/ReadOption"
#define HADOOP_DS "org/apache/hadoop/net/unix/DomainSocket"
@@ -104,6 +108,9 @@ const char *getClassName(CachedJavaClass cachedJavaClass);
#define JAVA_BYTEBUFFER "java/nio/ByteBuffer"
#define JAVA_STRING "java/lang/String"
#define JAVA_ENUMSET "java/util/EnumSet"
+#define JAVA_CFUTURE "java/util/concurrent/CompletableFuture"
+#define JAVA_TIMEUNIT "java/util/concurrent/TimeUnit"
+#define JAVA_OBJECT "java/lang/Object"
/* Some frequently used third-party class names */
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
index bda27b9a432..2d265b8f03c 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
+++
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
@@ -250,6 +250,65 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int
flags,
return ret;
}
+hdfsOpenFileBuilder *hdfsOpenFileBuilderAlloc(hdfsFS fs,
+ const char *path) {
+ return libhdfs_hdfsOpenFileBuilderAlloc(fs->libhdfsRep, path);
+}
+
+hdfsOpenFileBuilder *hdfsOpenFileBuilderMust(
+ hdfsOpenFileBuilder *builder, const char *key,
+ const char *value) {
+ return libhdfs_hdfsOpenFileBuilderMust(builder, key, value);
+}
+
+hdfsOpenFileBuilder *hdfsOpenFileBuilderOpt(
+ hdfsOpenFileBuilder *builder, const char *key,
+ const char *value) {
+ return libhdfs_hdfsOpenFileBuilderOpt(builder, key, value);
+}
+
+hdfsOpenFileFuture *hdfsOpenFileBuilderBuild(
+ hdfsOpenFileBuilder *builder) {
+ return libhdfs_hdfsOpenFileBuilderBuild(builder);
+}
+
+void hdfsOpenFileBuilderFree(hdfsOpenFileBuilder *builder) {
+ libhdfs_hdfsOpenFileBuilderFree(builder);
+}
+
+hdfsFile hdfsOpenFileFutureGet(hdfsOpenFileFuture *future) {
+ hdfsFile ret = calloc(1, sizeof(struct hdfsFile_internal));
+ ret->libhdfsppRep = 0;
+ ret->libhdfsRep = libhdfs_hdfsOpenFileFutureGet(future);
+ if (!ret->libhdfsRep) {
+ free(ret);
+ ret = NULL;
+ }
+ return ret;
+}
+
+hdfsFile hdfsOpenFileFutureGetWithTimeout(hdfsOpenFileFuture *future,
+ int64_t timeout, javaConcurrentTimeUnit timeUnit) {
+ hdfsFile ret = calloc(1, sizeof(struct hdfsFile_internal));
+ ret->libhdfsppRep = 0;
+ ret->libhdfsRep = libhdfs_hdfsOpenFileFutureGetWithTimeout(future, timeout,
+ timeUnit);
+ if (!ret->libhdfsRep) {
+ free(ret);
+ ret = NULL;
+ }
+ return ret;
+}
+
+int hdfsOpenFileFutureCancel(hdfsOpenFileFuture *future,
+ int mayInterruptIfRunning) {
+ return libhdfs_hdfsOpenFileFutureCancel(future, mayInterruptIfRunning);
+}
+
+void hdfsOpenFileFutureFree(hdfsOpenFileFuture *future) {
+ libhdfs_hdfsOpenFileFutureFree(future);
+}
+
int hdfsTruncateFile(hdfsFS fs, const char* path, tOffset newlength) {
return libhdfs_hdfsTruncateFile(fs->libhdfsRep, path, newlength);
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_defines.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_defines.h
index 0d014341b4c..16574414255 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_defines.h
+++
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_defines.h
@@ -39,6 +39,23 @@
#define hdfsConfStrFree libhdfs_hdfsConfStrFree
#define hdfsDisconnect libhdfs_hdfsDisconnect
#define hdfsOpenFile libhdfs_hdfsOpenFile
+#define hdfsOpenFileBuilderAlloc libhdfs_hdfsOpenFileBuilderAlloc
+#define hdfsOpenFileBuilderMust libhdfs_hdfsOpenFileBuilderMust
+#define hdfsOpenFileBuilderOpt libhdfs_hdfsOpenFileBuilderOpt
+#define hdfsOpenFileBuilderBuild libhdfs_hdfsOpenFileBuilderBuild
+#define hdfsOpenFileBuilderFree libhdfs_hdfsOpenFileBuilderFree
+#define hdfsOpenFileFutureGet libhdfs_hdfsOpenFileFutureGet
+#define javaConcurrentTimeUnit libhdfs_javaConcurrentTimeUnit
+#define jNanoseconds libhdfs_jNanoseconds
+#define jMicroseconds libhdfs_jMicroseconds
+#define jMilliseconds libhdfs_jMilliseconds
+#define jSeconds libhdfsj_jSeconds
+#define jMinutes libhdfs_jMinutes
+#define jHours libhdfs_jHours
+#define jDays libhdfs_jDays
+#define hdfsOpenFileFutureGetWithTimeout
libhdfs_hdfsOpenFileFutureGetWithTimeout
+#define hdfsOpenFileFutureCancel libhdfs_hdfsOpenFileFutureCancel
+#define hdfsOpenFileFutureFree libhdfs_hdfsOpenFileFutureFree
#define hdfsTruncateFile libhdfs_hdfsTruncateFile
#define hdfsUnbufferFile libhdfs_hdfsUnbufferFile
#define hdfsCloseFile libhdfs_hdfsCloseFile
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h
index d46768c02ad..d84b8ba2875 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h
+++
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h
@@ -39,6 +39,23 @@
#undef hdfsConfStrFree
#undef hdfsDisconnect
#undef hdfsOpenFile
+#undef hdfsOpenFileBuilderAlloc
+#undef hdfsOpenFileBuilderMust
+#undef hdfsOpenFileBuilderOpt
+#undef hdfsOpenFileBuilderBuild
+#undef hdfsOpenFileBuilderFree
+#undef hdfsOpenFileFutureGet
+#undef javaConcurrentTimeUnit
+#undef jNanoseconds
+#undef jMicroseconds
+#undef jMilliseconds
+#undef jSeconds
+#undef jMinutes
+#undef jHours
+#undef jDays
+#undef hdfsOpenFileFutureGetWithTimeout
+#undef hdfsOpenFileFutureCancel
+#undef hdfsOpenFileFutureFree
#undef hdfsTruncateFile
#undef hdfsUnbufferFile
#undef hdfsCloseFile
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h
index 4b08d0556c3..0a6d987409f 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h
+++
b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h
@@ -39,6 +39,23 @@
#define hdfsConfStrFree libhdfspp_hdfsConfStrFree
#define hdfsDisconnect libhdfspp_hdfsDisconnect
#define hdfsOpenFile libhdfspp_hdfsOpenFile
+#define hdfsOpenFileBuilderAlloc libhdfspp_hdfsOpenFileBuilderAlloc
+#define hdfsOpenFileBuilderMust libhdfspp_hdfsOpenFileBuilderMust
+#define hdfsOpenFileBuilderOpt libhdfspp_hdfsOpenFileBuilderOpt
+#define hdfsOpenFileBuilderBuild libhdfspp_hdfsOpenFileBuilderBuild
+#define hdfsOpenFileBuilderFree libhdfspp_hdfsOpenFileBuilderFree
+#define hdfsOpenFileFutureGet libhdfspp_hdfsOpenFileFutureGet
+#define javaConcurrentTimeUnit libhdfspp_javaConcurrentTimeUnit
+#define jNanoseconds libhdfspp_jNanoseconds
+#define jMicroseconds libhdfspp_jMicroseconds
+#define jMilliseconds libhdfspp_jMilliseconds
+#define jSeconds libhdfspp_jSeconds
+#define jMinutes libhdfspp_jMinutes
+#define jHours libhdfspp_jHours
+#define jDays libhdfspp_jDays
+#define hdfsOpenFileFutureGetWithTimeout
libhdfspp_hdfsOpenFileFutureGetWithTimeout
+#define hdfsOpenFileFutureCancel libhdfspp_hdfsOpenFileFutureCancel
+#define hdfsOpenFileFutureFree libhdfspp_hdfsOpenFileFutureFree
#define hdfsTruncateFile libhdfspp_hdfsTruncateFile
#define hdfsUnbufferFile libhdfspp_hdfsUnbufferFile
#define hdfsCloseFile libhdfspp_hdfsCloseFile
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]