This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 192e59234 [#4888] improvement(core): Add the implementations for
`getFileLocation` interface in core module (#4891)
192e59234 is described below
commit 192e5923428b43e0ebed072dab932766bea8a78f
Author: xloya <[email protected]>
AuthorDate: Wed Sep 11 13:55:46 2024 +0800
[#4888] improvement(core): Add the implementations for `getFileLocation`
interface in core module (#4891)
### What changes were proposed in this pull request?
Add the implementations for `getFileLocation` interface in Core module.
### Why are the changes needed?
Fix: #4888
### How was this patch tested?
Add some UTs.
---------
Co-authored-by: xiaojiebao <[email protected]>
---
.../catalog/FilesetNormalizeDispatcher.java | 4 +-
.../catalog/FilesetOperationDispatcher.java | 5 +-
.../gravitino/hook/FilesetHookDispatcher.java | 2 +-
.../gravitino/listener/FilesetEventDispatcher.java | 24 ++++++-
.../listener/api/event/GetFileLocationEvent.java | 80 ++++++++++++++++++++++
.../api/event/GetFileLocationFailureEvent.java | 57 +++++++++++++++
.../catalog/TestFilesetOperationDispatcher.java | 51 ++++++++++++++
.../gravitino/connector/TestCatalogOperations.java | 27 +++++++-
.../listener/api/event/TestFilesetEvent.java | 63 +++++++++++++++++
9 files changed, 308 insertions(+), 5 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
index 4c87efcee..e1c20de70 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
@@ -96,7 +96,9 @@ public class FilesetNormalizeDispatcher implements
FilesetDispatcher {
@Override
public String getFileLocation(NameIdentifier ident, String subPath) {
- throw new UnsupportedOperationException("Not implemented");
+ // The constraints of the name spec may be more strict than underlying
catalog,
+ // and for compatibility reasons, we only apply case-sensitive
capabilities here.
+ return dispatcher.getFileLocation(normalizeCaseSensitive(ident), subPath);
}
private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
index 73339bb89..98c6311bd 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
@@ -208,6 +208,9 @@ public class FilesetOperationDispatcher extends
OperationDispatcher implements F
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
- throw new UnsupportedOperationException("Not implemented");
+ return doWithCatalog(
+ getCatalogIdentifier(ident),
+ c -> c.doWithFilesetOps(f -> f.getFileLocation(ident, subPath)),
+ NonEmptyEntityException.class);
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
index 9557ecb72..e77801355 100644
--- a/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/hook/FilesetHookDispatcher.java
@@ -97,6 +97,6 @@ public class FilesetHookDispatcher implements
FilesetDispatcher {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
- throw new UnsupportedOperationException("Not implemented");
+ return dispatcher.getFileLocation(ident, subPath);
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
b/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
index 108220dfc..fd8e6c370 100644
---
a/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/listener/FilesetEventDispatcher.java
@@ -19,9 +19,11 @@
package org.apache.gravitino.listener;
+import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.catalog.FilesetDispatcher;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
@@ -34,6 +36,8 @@ import
org.apache.gravitino.listener.api.event.CreateFilesetEvent;
import org.apache.gravitino.listener.api.event.CreateFilesetFailureEvent;
import org.apache.gravitino.listener.api.event.DropFilesetEvent;
import org.apache.gravitino.listener.api.event.DropFilesetFailureEvent;
+import org.apache.gravitino.listener.api.event.GetFileLocationEvent;
+import org.apache.gravitino.listener.api.event.GetFileLocationFailureEvent;
import org.apache.gravitino.listener.api.event.ListFilesetEvent;
import org.apache.gravitino.listener.api.event.ListFilesetFailureEvent;
import org.apache.gravitino.listener.api.event.LoadFilesetEvent;
@@ -142,6 +146,24 @@ public class FilesetEventDispatcher implements
FilesetDispatcher {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
- throw new UnsupportedOperationException("Not implemented");
+ try {
+ String actualFileLocation = dispatcher.getFileLocation(ident, subPath);
+ // get the audit info from the thread local context
+ CallerContext callerContext = CallerContext.CallerContextHolder.get();
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ builder.putAll(callerContext.context());
+ eventBus.dispatchEvent(
+ new GetFileLocationEvent(
+ PrincipalUtils.getCurrentUserName(),
+ ident,
+ actualFileLocation,
+ subPath,
+ builder.build()));
+ return actualFileLocation;
+ } catch (Exception e) {
+ eventBus.dispatchEvent(
+ new GetFileLocationFailureEvent(PrincipalUtils.getCurrentUserName(),
ident, subPath, e));
+ throw e;
+ }
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationEvent.java
new file mode 100644
index 000000000..8995f7db7
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationEvent.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents an event that occurs when getting an actual file location. */
+@DeveloperApi
+public final class GetFileLocationEvent extends FilesetEvent {
+ private final String actualFileLocation;
+ private final String subPath;
+ private final Map<String, String> context;
+
+ /**
+ * Constructs a new {@code GetFileLocationEvent}, recording the attempt to
get a file location.
+ *
+ * @param user The user who initiated the get file location.
+ * @param identifier The identifier of the file location that was attempted
to be got.
+ * @param actualFileLocation The actual file location which want to get.
+ * @param subPath The accessing sub path of the get file location operation.
+ * @param context The audit context, this param can be null.
+ */
+ public GetFileLocationEvent(
+ String user,
+ NameIdentifier identifier,
+ String actualFileLocation,
+ String subPath,
+ Map<String, String> context) {
+ super(user, identifier);
+ this.actualFileLocation = actualFileLocation;
+ this.subPath = subPath;
+ this.context = context;
+ }
+
+ /**
+ * Get the actual file location after processing of the get file location
operation.
+ *
+ * @return The actual file location.
+ */
+ public String actualFileLocation() {
+ return actualFileLocation;
+ }
+
+ /**
+ * Get the accessing sub path of the get file location operation.
+ *
+ * @return The accessing sub path.
+ */
+ public String subPath() {
+ return subPath;
+ }
+
+ /**
+ * Get the audit context map of the get file location operation.
+ *
+ * @return The audit context map.
+ */
+ public Map<String, String> context() {
+ return context;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationFailureEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationFailureEvent.java
new file mode 100644
index 000000000..0826f0815
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/GetFileLocationFailureEvent.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/**
+ * Represents an event that is generated when an attempt to get a file
location from the system
+ * fails.
+ */
+@DeveloperApi
+public final class GetFileLocationFailureEvent extends FilesetFailureEvent {
+ private final String subPath;
+
+ /**
+ * Constructs a new {@code GetFileLocationFailureEvent}.
+ *
+ * @param user The user who initiated the get a file location.
+ * @param identifier The identifier of the file location that was attempted
to be got.
+ * @param subPath The sub path of the actual file location which want to get.
+ * @param exception The exception that was thrown during the get a file
location. This exception
+ * is key to diagnosing the failure, providing insights into what went
wrong during the
+ * operation.
+ */
+ public GetFileLocationFailureEvent(
+ String user, NameIdentifier identifier, String subPath, Exception
exception) {
+ super(user, identifier, exception);
+ this.subPath = subPath;
+ }
+
+ /**
+ * Get the audit context map of the get file location operation.
+ *
+ * @return The audit context map.
+ */
+ public String subPath() {
+ return subPath;
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestFilesetOperationDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestFilesetOperationDispatcher.java
index 551e588a5..101ebab07 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestFilesetOperationDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestFilesetOperationDispatcher.java
@@ -21,8 +21,10 @@ package org.apache.gravitino.catalog;
import static org.apache.gravitino.StringIdentifier.ID_KEY;
import com.google.common.collect.ImmutableMap;
+import java.io.File;
import java.io.IOException;
import java.util.Map;
+import java.util.UUID;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.file.Fileset;
@@ -169,4 +171,53 @@ public class TestFilesetOperationDispatcher extends
TestOperationDispatcher {
Assertions.assertTrue(dropped);
Assertions.assertFalse(filesetOperationDispatcher.dropFileset(filesetIdent1));
}
+
+ @Test
+ public void testCreateAndGetFileLocation() {
+ String tmpDir = "/tmp/test_get_file_location_" + UUID.randomUUID();
+ try {
+ Namespace filesetNs = Namespace.of(metalake, catalog, "schema1024");
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "location",
"schema1024");
+ schemaOperationDispatcher.createSchema(
+ NameIdentifier.of(filesetNs.levels()), "comment", props);
+
+ NameIdentifier filesetIdent1 = NameIdentifier.of(filesetNs,
"fileset1024");
+ Fileset fileset1 =
+ filesetOperationDispatcher.createFileset(
+ filesetIdent1, "comment", Fileset.Type.MANAGED, tmpDir, props);
+ Assertions.assertEquals("fileset1024", fileset1.name());
+ Assertions.assertEquals("comment", fileset1.comment());
+ testProperties(props, fileset1.properties());
+ Assertions.assertEquals(Fileset.Type.MANAGED, fileset1.type());
+ Assertions.assertNotNull(fileset1.storageLocation());
+
+ // test sub path starts with "/"
+ String subPath1 = "/test/test.parquet";
+ String fileLocation1 =
filesetOperationDispatcher.getFileLocation(filesetIdent1, subPath1);
+ Assertions.assertEquals(
+ String.format("%s%s", fileset1.storageLocation(), subPath1),
fileLocation1);
+
+ // test sub path not starts with "/"
+ String subPath2 = "test/test.parquet";
+ String fileLocation2 =
filesetOperationDispatcher.getFileLocation(filesetIdent1, subPath2);
+ Assertions.assertEquals(
+ String.format("%s/%s", fileset1.storageLocation(), subPath2),
fileLocation2);
+
+ // test sub path is null
+ String subPath3 = null;
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> filesetOperationDispatcher.getFileLocation(filesetIdent1,
subPath3));
+
+ // test sub path is blank but not null
+ String subPath4 = "";
+ String fileLocation3 =
filesetOperationDispatcher.getFileLocation(filesetIdent1, subPath4);
+ Assertions.assertEquals(fileset1.storageLocation(), fileLocation3);
+ } finally {
+ File path = new File(tmpDir);
+ if (path.exists()) {
+ path.delete();
+ }
+ }
+ }
}
diff --git
a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
index 0b73bf4b2..52ce63e23 100644
---
a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
+++
b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
@@ -18,11 +18,13 @@
*/
package org.apache.gravitino.connector;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
@@ -75,6 +77,8 @@ public class TestCatalogOperations
public static final String FAIL_TEST = "need-fail";
+ private static final String SLASH = "/";
+
public TestCatalogOperations(Map<String, String> config) {
tables = Maps.newHashMap();
schemas = Maps.newHashMap();
@@ -432,7 +436,28 @@ public class TestCatalogOperations
@Override
public String getFileLocation(NameIdentifier ident, String subPath) {
- throw new UnsupportedOperationException("Not implemented");
+ Preconditions.checkArgument(subPath != null, "subPath must not be null");
+ String processedSubPath;
+ if (!subPath.trim().isEmpty() && !subPath.trim().startsWith(SLASH)) {
+ processedSubPath = SLASH + subPath.trim();
+ } else {
+ processedSubPath = subPath.trim();
+ }
+
+ Fileset fileset = loadFileset(ident);
+
+ String fileLocation;
+ // subPath cannot be null, so we only need check if it is blank
+ if (StringUtils.isBlank(processedSubPath)) {
+ fileLocation = fileset.storageLocation();
+ } else {
+ String storageLocation =
+ fileset.storageLocation().endsWith(SLASH)
+ ? fileset.storageLocation().substring(0,
fileset.storageLocation().length() - 1)
+ : fileset.storageLocation();
+ fileLocation = String.format("%s%s", storageLocation, processedSubPath);
+ }
+ return fileLocation;
}
@Override
diff --git
a/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java
b/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java
index cd040e243..efc073b19 100644
---
a/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java
+++
b/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java
@@ -19,15 +19,21 @@
package org.apache.gravitino.listener.api.event;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.audit.CallerContext;
+import org.apache.gravitino.audit.FilesetAuditConstants;
+import org.apache.gravitino.audit.FilesetDataOperation;
+import org.apache.gravitino.audit.InternalClientType;
import org.apache.gravitino.catalog.FilesetDispatcher;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.file.Fileset;
@@ -121,6 +127,47 @@ public class TestFilesetEvent {
Assertions.assertEquals(namespace, ((ListFilesetEvent) event).namespace());
}
+ @Test
+ void testGetFileLocationEvent() {
+ NameIdentifier identifier = NameIdentifier.of("metalake", "catalog",
fileset.name());
+ dispatcher.createFileset(
+ identifier,
+ fileset.comment(),
+ fileset.type(),
+ fileset.storageLocation(),
+ fileset.properties());
+ Event event = dummyEventListener.popEvent();
+ Assertions.assertEquals(identifier, event.identifier());
+ Assertions.assertEquals(CreateFilesetEvent.class, event.getClass());
+ FilesetInfo filesetInfo = ((CreateFilesetEvent)
event).createdFilesetInfo();
+ checkFilesetInfo(filesetInfo, fileset);
+
+ Map<String, String> contextMap = Maps.newHashMap();
+ contextMap.put(
+ FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
+ InternalClientType.HADOOP_GVFS.name());
+ contextMap.put(
+ FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
+ FilesetDataOperation.GET_FILE_STATUS.name());
+ CallerContext callerContext =
CallerContext.builder().withContext(contextMap).build();
+ CallerContext.CallerContextHolder.set(callerContext);
+ String fileLocation = dispatcher.getFileLocation(identifier, "test");
+ Event event1 = dummyEventListener.popEvent();
+ Assertions.assertEquals(identifier, event1.identifier());
+ Assertions.assertEquals(GetFileLocationEvent.class, event1.getClass());
+ String actualFileLocation = ((GetFileLocationEvent)
event1).actualFileLocation();
+ Assertions.assertEquals(actualFileLocation, fileLocation);
+ Map<String, String> actualContext = ((GetFileLocationEvent)
event1).context();
+ assertEquals(2, actualContext.size());
+ Assertions.assertEquals(
+ InternalClientType.HADOOP_GVFS.name(),
+
actualContext.get(FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE));
+ Assertions.assertEquals(
+ FilesetDataOperation.GET_FILE_STATUS.name(),
+
actualContext.get(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION));
+ Assertions.assertEquals("test", ((GetFileLocationEvent) event1).subPath());
+ }
+
@Test
void testCreateSchemaFailureEvent() {
NameIdentifier identifier = NameIdentifier.of("metalake", "catalog",
"fileset");
@@ -194,6 +241,20 @@ public class TestFilesetEvent {
Assertions.assertEquals(namespace, ((ListFilesetFailureEvent)
event).namespace());
}
+ @Test
+ void testGetFileLocationFailureEvent() {
+ NameIdentifier identifier = NameIdentifier.of("metalake", "catalog",
"fileset");
+ Assertions.assertThrowsExactly(
+ GravitinoRuntimeException.class,
+ () -> failureDispatcher.getFileLocation(identifier, "/test"));
+ Event event = dummyEventListener.popEvent();
+ Assertions.assertEquals(identifier, event.identifier());
+ Assertions.assertEquals(GetFileLocationFailureEvent.class,
event.getClass());
+ Assertions.assertEquals(
+ GravitinoRuntimeException.class,
+ ((GetFileLocationFailureEvent) event).exception().getClass());
+ }
+
private void checkFilesetInfo(FilesetInfo filesetInfo, Fileset fileset) {
Assertions.assertEquals(fileset.name(), filesetInfo.name());
Assertions.assertEquals(fileset.type(), filesetInfo.type());
@@ -227,6 +288,8 @@ public class TestFilesetEvent {
when(dispatcher.listFilesets(any(Namespace.class))).thenReturn(null);
when(dispatcher.alterFileset(any(NameIdentifier.class),
any(FilesetChange.class)))
.thenReturn(fileset);
+ when(dispatcher.getFileLocation(any(NameIdentifier.class), any()))
+ .thenReturn("file:/test/xxx.parquet");
return dispatcher;
}