igorcalabria opened a new issue, #5590: URL: https://github.com/apache/iceberg/issues/5590
### Feature Request / Improvement Hi everyone, I asked around in slack and @singhpk234 pointed me towards https://github.com/apache/spark/pull/34072 where spark rejected the idea of passing DataFrame options via hints, which was a reasonable decision(in my option). The funny thing is that in that thread a user proposed using [table valued functions](https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-tvf.html) as an alternative and it matched my initial idea. Instead of using hints to pass read options, one could simply use a table function like this ```sql SELECT * FROM incremental_read('table_name', 'snapshot-start', 'snapshot-end') ``` Or something more generic ```sql SELECT * FROM with_options('table_name', 'option1-key', 'option1-value', 'option2-key', 'option2-value') ``` I'm terrible with naming things so please ignore the function names :smile:. This syntax is a bit odd, but it's present on other databases too From initial testing, implementing a function for this is surprisingly easy. We just have to inject a table function https://spark.apache.org/docs/3.3.0/api/java/org/apache/spark/sql/SparkSessionExtensions.html#injectTableFunction-scala.Tuple3- that returns an `UnresolvedRelation` with the injected options. Simple proof of concept(using java, so just ignore the weird scala conversions): ```java public class Extensions implements Function1<SparkSessionExtensions, BoxedUnit> { @Override public BoxedUnit apply(SparkSessionExtensions sparkSessionExtensions) { FunctionIdentifier identifer = FunctionIdentifier.apply("incremental_read"); ExpressionInfo info = new ExpressionInfo("noclass", "incremental_read"); sparkSessionExtensions.injectTableFunction(Tuple3.apply(identifer, info, new Function1<Seq<Expression>, LogicalPlan>() { @Override public LogicalPlan apply(Seq<Expression> children) { if (children.size() != 1) { throw new RuntimeException("Wrong number of arguments my dude"); } var identifierExpression = children.apply(0); if (!identifierExpression.foldable()) { throw new RuntimeException("Only constant arguments supported my dude"); } if (!identifierExpression.dataType().sameType(DataTypes.StringType)) { throw new RuntimeException("Only string types my dude"); } String table = ((UTF8String)identifierExpression.eval(InternalRow.empty())).toString(); Seq<String> tableIdentifier = CollectionConverters.CollectionHasAsScala(Arrays.asList(table.split("\\."))).asScala().toSeq(); HashMap<String, String> options = new HashMap<>(); options.put("start-snapshot-id", "3807160769970787368"); // hard coded for simplicity return new UnresolvedRelation(tableIdentifier, new CaseInsensitiveStringMap(options), false); } })); return null; } } ``` The best thing about this is that it doesn't even need changes to iceberg or spark. One could load this extension separately, but I do think that incremental reads are a very important part of iceberg's API and shouldn't be left out of the SQL syntax. I'd love to hear opinions on this strategy and if (or when) we all agree on something, I could provide a PR for this. Cheers, ### Query engine Spark -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
