Marcus-Rosti opened a new issue, #7790:
URL: https://github.com/apache/gravitino/issues/7790
Say I have two metalakes, metalake_0 and metalake_1
Say I have two buckets to which I want data written for iceberg:
s3://metalake_0 and s3://metalake_1
How do I get spark, the iceberg-rest-catalog, and gravitino to all play
nicely together in this way?
I'm trying to simulate all this in minio based off the gravitino-playground
okay here's how I create "metalakes"
```scala
case class Metalake(
name: String
)
def createMetalake(client: Client[IO], metalakeName: String): IO[Unit] =
for {
_ <- IO.println("Creating metalake")
_ <- client
.expect[Unit](
Request[IO](
Method.POST,
uri"http://gravitino:8090/api/metalakes"
).withEntity(Metalake(metalakeName).asJson)
)
.handleErrorWith {
case e: UnexpectedStatus if e.status == Status.Conflict =>
IO.println("Metalake already exists. Continuing")
case e => IO.raiseError(e)
}
} yield ()
```
and catalogs...
```scala
case class Catalog(
name: String,
`type`: String,
provider: String,
properties: Map[String, String]
)
def createCatalog(client: Client[IO], metalakeName: String): IO[Unit] = {
val catalogProperties = Map(
"uri" -> "http://gravitino:9002/iceberg",
"catalog-backend" -> "rest",
"warehouse" -> f"s3://$metalakeName",
"catalog-backend-name" -> metalakeName,
"io-impl" -> "org.apache.iceberg.aws.s3.S3FileIO",
"s3-access-key-id" -> "minioadmin",
"s3-secret-access-key" -> "minioadmin",
"s3-endpoint" -> "http://minio:9000",
"s3-region" -> "us-east-1"
)
val requestPayload = Catalog(
name = "mylake",
`type` = "relational",
provider = "lakehouse-iceberg",
properties = catalogProperties
)
val acceptHeader =
Accept(MediaType.unsafeParse("application/vnd.gravitino.v1+json"))
val request = Request[IO](
Method.POST,
uri"http://gravitino:8090".withPath(path"api/metalakes" / metalakeName
/ "catalogs")
).withEntity(requestPayload.asJson).putHeaders(acceptHeader)
for {
_ <- IO.println("Creating catalog")
_ <- client
.expect[String](request)
.flatMap { successBody =>
IO.println(s"Successfully created catalog. Response: $successBody")
}
.handleErrorWith {
case e: UnexpectedStatus if e.status == Status.Conflict =>
IO.println("Catalog already exists")
case e => IO.println(e).flatMap(_ => IO.raiseError(e))
}
} yield ()
}
```
and here's how I make the "schema"
```
case class Schema(
name: String,
properties: Map[String, String]
)
def createSchema(client: Client[IO], metalakeName: String): IO[Unit] = {
val schemaName = "db"
val requestPayload = Schema(
name = schemaName,
properties = Map(
"warehouse" -> f"s3://$metalakeName",
"location" -> f"s3://$metalakeName/$schemaName"
)
)
val acceptHeader =
Accept(MediaType.unsafeParse("application/vnd.gravitino.v1+json"))
val request = Request[IO](
Method.POST,
uri"http://gravitino:8090".withPath(
path"api/metalakes" / metalakeName / "catalogs" / "mylake" /
"schemas")
).withEntity(requestPayload.asJson).putHeaders(acceptHeader)
for {
_ <- IO.println(s"Creating schema '$schemaName'...")
_ <- client
.expect[String](request)
.flatMap { successBody =>
IO.println(s"Successfully created schema. Response: $successBody")
}
.handleErrorWith {
case e: UnexpectedStatus if e.status == Status.Conflict =>
IO.println("Schema already exists")
case e => IO.println(e).flatMap(_ => IO.raiseError(e))
}
} yield ()
}
```
however when I go to create any table it all tries to use whatever default
is in gravitino.conf, and if I remove that gravitino itself complains about the
warehouse parameter not being set...
```
val spark = SparkSession
.builder()
.appName("create-tables")
.master("local[*]")
.config(
Map(
// Gravitino
"spark.plugins" ->
"org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin",
"spark.sql.gravitino.uri" -> "http://gravitino:8090",
"spark.sql.gravitino.metalake" -> metalakeName,
"spark.sql.gravitino.enableIcebergSupport" -> "true"
// Iceberg catalog
// "spark.sql.catalog.mylake" ->
"org.apache.gravitino.spark.connector.GravitinoSparkCatalog"
))
.getOrCreate()
spark.sql("""use mylake;""")
// spark.sql("""
// |CREATE DATABASE IF NOT EXISTS db
// |""".stripMargin)
spark.sql("""use db;""")
// Create nats gameserver events test table
spark
.sql(
s"""
|CREATE TABLE IF NOT EXISTS ${config.someEventName} (
| session_id STRING,
| position_x FLOAT,
| position_y FLOAT,
| position_z FLOAT,
| user_id STRING,
| time TIMESTAMP,
|)
|USING iceberg
|PARTITIONED BY (
| hours(time)
|);
|""".stripMargin
)
.show()
```
any ideas why this is happening or how to fix it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]