This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e340ad5be0 Core: Allow passing identity object through
RESTSessionCatalog (#7088)
e340ad5be0 is described below
commit e340ad5be04e902398c576f431810c3dfa4fe717
Author: David Phillips <[email protected]>
AuthorDate: Mon Mar 13 16:44:51 2023 -0700
Core: Allow passing identity object through RESTSessionCatalog (#7088)
---
.../org/apache/iceberg/catalog/SessionCatalog.java | 20 ++++++++++++
.../java/org/apache/iceberg/rest/RESTCatalog.java | 2 +-
.../apache/iceberg/rest/RESTSessionCatalog.java | 36 +++++++++++-----------
3 files changed, 39 insertions(+), 19 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java
b/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java
index 5e0c0e230f..157a499370 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java
@@ -38,6 +38,7 @@ public interface SessionCatalog {
private final String identity;
private final Map<String, String> credentials;
private final Map<String, String> properties;
+ private final Object wrappedIdentity;
public static SessionContext createEmpty() {
return new SessionContext(UUID.randomUUID().toString(), null, null,
ImmutableMap.of());
@@ -48,10 +49,20 @@ public interface SessionCatalog {
String identity,
Map<String, String> credentials,
Map<String, String> properties) {
+ this(sessionId, identity, credentials, properties, null);
+ }
+
+ public SessionContext(
+ String sessionId,
+ String identity,
+ Map<String, String> credentials,
+ Map<String, String> properties,
+ Object wrappedIdentity) {
this.sessionId = sessionId;
this.identity = identity;
this.credentials = credentials;
this.properties = properties;
+ this.wrappedIdentity = wrappedIdentity;
}
/**
@@ -95,6 +106,15 @@ public interface SessionCatalog {
public Map<String, String> properties() {
return properties;
}
+
+ /**
+ * Returns the opaque wrapped identity object.
+ *
+ * @return the wrapped identity
+ */
+ public Object wrappedIdentity() {
+ return wrappedIdentity;
+ }
}
/**
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
index 066b62c538..e512f26276 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -58,7 +58,7 @@ public class RESTCatalog implements Catalog,
SupportsNamespaces, Configurable<Ob
public RESTCatalog(
SessionCatalog.SessionContext context,
Function<Map<String, String>, RESTClient> clientBuilder) {
- this.sessionCatalog = new RESTSessionCatalog(clientBuilder);
+ this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null);
this.delegate = sessionCatalog.asCatalog(context);
this.nsDelegate = (SupportsNamespaces) delegate;
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 25c0d2d9b9..2b363752b7 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
@@ -98,7 +99,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
OAuth2Properties.SAML1_TOKEN_TYPE);
private final Function<Map<String, String>, RESTClient> clientBuilder;
- private Function<Map<String, String>, FileIO> ioBuilder = null;
+ private final BiFunction<SessionContext, Map<String, String>, FileIO>
ioBuilder;
private Cache<String, AuthSession> sessions = null;
private AuthSession catalogAuth = null;
private boolean keepTokenRefreshed = true;
@@ -123,11 +124,15 @@ public class RESTSessionCatalog extends BaseSessionCatalog
}
public RESTSessionCatalog() {
- this(config ->
HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
+ this(config ->
HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build(),
null);
}
- RESTSessionCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
+ public RESTSessionCatalog(
+ Function<Map<String, String>, RESTClient> clientBuilder,
+ BiFunction<SessionContext, Map<String, String>, FileIO> ioBuilder) {
+ Preconditions.checkNotNull(clientBuilder, "Invalid client builder: null");
this.clientBuilder = clientBuilder;
+ this.ioBuilder = ioBuilder;
}
@Override
@@ -188,7 +193,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
client, tokenRefreshExecutor(), token,
expiresAtMillis(mergedProps), catalogAuth);
}
- this.io = newFileIO(mergedProps);
+ this.io = newFileIO(SessionContext.createEmpty(), mergedProps);
this.snapshotMode =
SnapshotMode.valueOf(
@@ -203,11 +208,6 @@ public class RESTSessionCatalog extends BaseSessionCatalog
super.initialize(name, mergedProps);
}
- public void setFileIOBuilder(Function<Map<String, String>, FileIO>
newIOBuilder) {
- Preconditions.checkState(null == io, "Cannot set IO builder after calling
initialize");
- this.ioBuilder = newIOBuilder;
- }
-
private AuthSession session(SessionContext context) {
AuthSession session =
sessions.get(
@@ -350,7 +350,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
client,
paths.table(loadedIdent),
session::headers,
- tableFileIO(response.config()),
+ tableFileIO(context, response.config()),
tableMetadata);
TableIdentifier tableIdentifier = loadedIdent;
@@ -605,7 +605,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
client,
paths.table(ident),
session::headers,
- tableFileIO(response.config()),
+ tableFileIO(context, response.config()),
response.tableMetadata());
return new BaseTable(ops, fullTableName(ident));
@@ -624,7 +624,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
client,
paths.table(ident),
session::headers,
- tableFileIO(response.config()),
+ tableFileIO(context, response.config()),
RESTTableOperations.UpdateType.CREATE,
createChanges(meta),
meta);
@@ -675,7 +675,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
client,
paths.table(ident),
session::headers,
- tableFileIO(response.config()),
+ tableFileIO(context, response.config()),
RESTTableOperations.UpdateType.REPLACE,
changes.build(),
base);
@@ -765,9 +765,9 @@ public class RESTSessionCatalog extends BaseSessionCatalog
return String.format("%s.%s", name(), ident);
}
- private FileIO newFileIO(Map<String, String> properties) {
+ private FileIO newFileIO(SessionContext context, Map<String, String>
properties) {
if (null != ioBuilder) {
- return ioBuilder.apply(properties);
+ return ioBuilder.apply(context, properties);
} else {
String ioImpl =
properties.getOrDefault(CatalogProperties.FILE_IO_IMPL,
ResolvingFileIO.class.getName());
@@ -775,14 +775,14 @@ public class RESTSessionCatalog extends BaseSessionCatalog
}
}
- private FileIO tableFileIO(Map<String, String> config) {
- if (config.isEmpty()) {
+ private FileIO tableFileIO(SessionContext context, Map<String, String>
config) {
+ if (config.isEmpty() && ioBuilder == null) {
return io; // reuse client and io since config is the same
}
Map<String, String> fullConf = RESTUtil.merge(properties(), config);
- return newFileIO(fullConf);
+ return newFileIO(context, fullConf);
}
private AuthSession tableSession(Map<String, String> tableConf, AuthSession
parent) {