jackye1995 commented on a change in pull request #1825:
URL: https://github.com/apache/iceberg/pull/1825#discussion_r556314873
##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -70,12 +74,12 @@
* </p>
*/
public class NessieCatalog extends BaseMetastoreCatalog implements
AutoCloseable, SupportsNamespaces, Configurable {
- private static final Logger logger =
LoggerFactory.getLogger(NessieCatalog.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NessieCatalog.class);
Review comment:
The standard is to name it `LOG`
##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/TableReference.java
##########
@@ -64,31 +87,137 @@ public static TableReference parse(TableIdentifier path) {
*/
public static TableReference parse(String path) {
// I am assuming tables can't have @ or # symbols
- if (path.split("@").length > 2) {
+ if (REF.splitToList(path).size() > 2) {
throw new IllegalArgumentException(String.format("Can only reference one
branch in %s", path));
}
- if (path.split("#").length > 2) {
+ if (TIMESTAMP.splitToList(path).size() > 2) {
throw new IllegalArgumentException(String.format("Can only reference one
timestamp in %s", path));
}
if (path.contains("@") && path.contains("#")) {
- throw new IllegalArgumentException("Invalid table name:" +
- " # is not allowed (reference by timestamp is not supported)");
+ List<String> tableRef = REF.splitToList(path);
+ if (tableRef.get(0).contains("#")) {
+ throw new IllegalArgumentException("Invalid table name:" +
+ " # is not allowed before @. Correct format is
table@ref#timestamp");
+ }
+ TableIdentifier identifier = TableIdentifier.parse(tableRef.get(0));
+ List<String> timestampRef = TIMESTAMP.splitToList(tableRef.get(1));
+ return new TableReference(identifier,
parseTimestamp(timestampRef.get(1)), timestampRef.get(0));
}
if (path.contains("@")) {
- String[] tableRef = path.split("@");
- TableIdentifier identifier = TableIdentifier.parse(tableRef[0]);
- return new TableReference(identifier, null, tableRef[1]);
+ List<String> tableRef = REF.splitToList(path);
+ TableIdentifier identifier = TableIdentifier.parse(tableRef.get(0));
+ return new TableReference(identifier, null, tableRef.get(1));
}
if (path.contains("#")) {
- throw new IllegalArgumentException("Invalid table name:" +
- " # is not allowed (reference by timestamp is not supported)");
+ List<String> tableTimestamp = TIMESTAMP.splitToList(path);
+ TableIdentifier identifier =
TableIdentifier.parse(tableTimestamp.get(0));
+ return new TableReference(identifier,
parseTimestamp(tableTimestamp.get(1)), null);
}
TableIdentifier identifier = TableIdentifier.parse(path);
return new TableReference(identifier, null, null);
}
+
+ private enum FormatOptions {
Review comment:
since this is only dealing with datetime, probably better to call it
`DateTimeFormatOptions`
##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -308,6 +314,36 @@ private UpdateableReference loadReference(String
requestedRef) {
}
}
+ private RefreshableReference loadReferenceAtTime(String requestedRef,
Instant timestamp) {
+ try {
+ Reference ref = requestedRef == null ?
client.getTreeApi().getDefaultBranch()
+ : client.getTreeApi().getReferenceByName(requestedRef);
+ if (ref instanceof Hash) {
+ throw new IllegalArgumentException(String.format("Cannot specify a
hash %s and timestamp %s together. " +
+ "The timestamp is redundant and has been ignored", requestedRef,
timestamp));
+ }
Review comment:
nit: add space after `if` and `for`
##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/TableReference.java
##########
@@ -64,31 +87,137 @@ public static TableReference parse(TableIdentifier path) {
*/
public static TableReference parse(String path) {
// I am assuming tables can't have @ or # symbols
- if (path.split("@").length > 2) {
+ if (REF.splitToList(path).size() > 2) {
throw new IllegalArgumentException(String.format("Can only reference one
branch in %s", path));
}
- if (path.split("#").length > 2) {
+ if (TIMESTAMP.splitToList(path).size() > 2) {
throw new IllegalArgumentException(String.format("Can only reference one
timestamp in %s", path));
}
if (path.contains("@") && path.contains("#")) {
- throw new IllegalArgumentException("Invalid table name:" +
- " # is not allowed (reference by timestamp is not supported)");
+ List<String> tableRef = REF.splitToList(path);
+ if (tableRef.get(0).contains("#")) {
+ throw new IllegalArgumentException("Invalid table name:" +
+ " # is not allowed before @. Correct format is
table@ref#timestamp");
+ }
+ TableIdentifier identifier = TableIdentifier.parse(tableRef.get(0));
+ List<String> timestampRef = TIMESTAMP.splitToList(tableRef.get(1));
+ return new TableReference(identifier,
parseTimestamp(timestampRef.get(1)), timestampRef.get(0));
}
if (path.contains("@")) {
- String[] tableRef = path.split("@");
- TableIdentifier identifier = TableIdentifier.parse(tableRef[0]);
- return new TableReference(identifier, null, tableRef[1]);
+ List<String> tableRef = REF.splitToList(path);
+ TableIdentifier identifier = TableIdentifier.parse(tableRef.get(0));
+ return new TableReference(identifier, null, tableRef.get(1));
}
if (path.contains("#")) {
- throw new IllegalArgumentException("Invalid table name:" +
- " # is not allowed (reference by timestamp is not supported)");
+ List<String> tableTimestamp = TIMESTAMP.splitToList(path);
+ TableIdentifier identifier =
TableIdentifier.parse(tableTimestamp.get(0));
+ return new TableReference(identifier,
parseTimestamp(tableTimestamp.get(1)), null);
}
TableIdentifier identifier = TableIdentifier.parse(path);
return new TableReference(identifier, null, null);
}
+
+ private enum FormatOptions {
+ DATE_TIME(DateTimeFormatter.ISO_DATE_TIME, ZonedDateTime::from),
+ LOCAL_DATE_TIME(DateTimeFormatter.ISO_LOCAL_DATE_TIME, t ->
LocalDateTime.from(t).atZone(sparkTimezoneOrUTC())),
+ LOCAL_DATE(DateTimeFormatter.ISO_LOCAL_DATE, t ->
LocalDate.from(t).atStartOfDay(sparkTimezoneOrUTC()));
+
+ private final DateTimeFormatter formatter;
+ private final TemporalQuery<ZonedDateTime> converter;
+
+ FormatOptions(DateTimeFormatter formatter, TemporalQuery<ZonedDateTime>
converter) {
+ this.formatter = formatter;
+ this.converter = converter;
+ }
+
+ public ZonedDateTime convert(String timestampStr) {
+ try {
+ return formatter.parse(timestampStr, converter);
+ } catch (DateTimeParseException e) {
+ return null;
+ }
+ }
+ }
+
+ private static Instant parseTimestamp(String timestamp) {
+ ZonedDateTime parsed;
+ for (FormatOptions options : FormatOptions.values()) {
+ parsed = options.convert(modifyTimestamp(timestamp));
+ if (parsed != null) {
+ return endOfPeriod(parsed, timestamp).toInstant();
+ }
+ }
+ throw new IllegalArgumentException(
+ String.format("Cannot parse timestamp: %s is not a legal format. (Use
an ISO 8601 compliant string)", timestamp)
+ );
+ }
+
+ private static String modifyTimestamp(String timestamp) {
+ if (timestamp.length() == 7) {
+ // only month. add a day
+ return String.format("%s-01", timestamp);
+ } else if (timestamp.length() == 4) {
+ // only year. add a month and day
+ return String.format("%s-01-01", timestamp);
+ } else {
+ return timestamp;
+ }
+ }
+
+ private static ZonedDateTime endOfPeriod(ZonedDateTime instant, String
timestamp) {
+ if (timestamp.length() == 10) {
+ // only date. Move to end of day
+ return endOfPeriod(instant, ChronoUnit.DAYS);
+ } else if (timestamp.length() == 7) {
+ // only month. Move to end of month
+ return endOfPeriod(instant, ChronoUnit.MONTHS);
+ } else if (timestamp.length() == 4) {
+ // only month. Move to end of month
+ return endOfPeriod(instant, ChronoUnit.YEARS);
+ } else {
+ return instant;
+ }
+ }
+
+ private static ZonedDateTime endOfPeriod(ZonedDateTime instant, ChronoUnit
unit) {
+ return instant.plus(1, unit).minus(1, ChronoUnit.MICROS);
+ }
+
+ private static ZoneId sparkTimezoneOrUTC() {
+ return sparkTimezone().map(ZoneId::of).orElse(UTC);
+ }
+
+ private static Optional<String> sparkTimezone() {
+ if (sparkAvailable != null) {
+ try {
+ sparkSessionMethod = DynMethods.builder("active")
+ .impl("org.apache.spark.sql.SparkSession").buildStatic();
+ sparkContextMethod = DynMethods.builder("sparkContext")
+ .impl("org.apache.spark.sql.SparkSession").build();
+ sparkConfMethod = DynMethods.builder("conf")
+ .impl("org.apache.spark.SparkContext").build();
+ sparkConfGetMethod = DynMethods.builder("get")
+ .impl("org.apache.spark.SparkConf").build();
+ sparkAvailable = true;
+ } catch (RuntimeException e) {
+ sparkAvailable = false; // spark not on classpath
+ }
+ }
+ if (sparkAvailable != null && sparkAvailable) {
Review comment:
`sparkAvailable != null` is not redundant because it must be set by L206
or L208. Even if it is not set, `if (sparkAvailable)` should still work
##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/TableReference.java
##########
@@ -64,31 +87,137 @@ public static TableReference parse(TableIdentifier path) {
*/
public static TableReference parse(String path) {
// I am assuming tables can't have @ or # symbols
- if (path.split("@").length > 2) {
+ if (REF.splitToList(path).size() > 2) {
throw new IllegalArgumentException(String.format("Can only reference one
branch in %s", path));
}
- if (path.split("#").length > 2) {
+ if (TIMESTAMP.splitToList(path).size() > 2) {
throw new IllegalArgumentException(String.format("Can only reference one
timestamp in %s", path));
}
if (path.contains("@") && path.contains("#")) {
- throw new IllegalArgumentException("Invalid table name:" +
- " # is not allowed (reference by timestamp is not supported)");
+ List<String> tableRef = REF.splitToList(path);
+ if (tableRef.get(0).contains("#")) {
+ throw new IllegalArgumentException("Invalid table name:" +
+ " # is not allowed before @. Correct format is
table@ref#timestamp");
+ }
+ TableIdentifier identifier = TableIdentifier.parse(tableRef.get(0));
+ List<String> timestampRef = TIMESTAMP.splitToList(tableRef.get(1));
+ return new TableReference(identifier,
parseTimestamp(timestampRef.get(1)), timestampRef.get(0));
}
if (path.contains("@")) {
- String[] tableRef = path.split("@");
- TableIdentifier identifier = TableIdentifier.parse(tableRef[0]);
- return new TableReference(identifier, null, tableRef[1]);
+ List<String> tableRef = REF.splitToList(path);
+ TableIdentifier identifier = TableIdentifier.parse(tableRef.get(0));
+ return new TableReference(identifier, null, tableRef.get(1));
}
if (path.contains("#")) {
- throw new IllegalArgumentException("Invalid table name:" +
- " # is not allowed (reference by timestamp is not supported)");
+ List<String> tableTimestamp = TIMESTAMP.splitToList(path);
+ TableIdentifier identifier =
TableIdentifier.parse(tableTimestamp.get(0));
+ return new TableReference(identifier,
parseTimestamp(tableTimestamp.get(1)), null);
}
TableIdentifier identifier = TableIdentifier.parse(path);
return new TableReference(identifier, null, null);
}
+
+ private enum FormatOptions {
+ DATE_TIME(DateTimeFormatter.ISO_DATE_TIME, ZonedDateTime::from),
+ LOCAL_DATE_TIME(DateTimeFormatter.ISO_LOCAL_DATE_TIME, t ->
LocalDateTime.from(t).atZone(sparkTimezoneOrUTC())),
Review comment:
Looks like we are relying heavily on Spark functionalities through
reflection. I am not very familiar with the Nessie module, is this a common
pattern? I feel we should make it more generic for other runtime environments.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]