This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c585f066 [SEDONA-370] Remove the old GeoTiff reader (#980)
9c585f066 is described below

commit 9c585f066e5a71652519ae8e8acca876090df9c0
Author: Jia Yu <[email protected]>
AuthorDate: Sun Aug 20 00:15:37 2023 -0700

    [SEDONA-370] Remove the old GeoTiff reader (#980)
---
 R/NAMESPACE                                        |   2 -
 R/R/data_interface.R                               |  49 +-
 R/tests/testthat/test-data-interface-raster.R      | 559 ---------------------
 R/vignettes/articles/raster.Rmd                    |  65 ---
 docs/api/sql/Raster-loader.md                      | 139 +----
 docs/api/sql/Raster-operators.md                   |  19 +-
 docs/api/sql/Raster-writer.md                      | 139 -----
 ...org.apache.spark.sql.sources.DataSourceRegister |   1 +
 .../scala/org/apache/sedona/sql/UDF/Catalog.scala  |   2 -
 .../sql/sedona_sql/expressions/raster/IO.scala     |  99 +---
 .../sedona_sql/expressions/raster/MapAlgebra.scala |  36 --
 .../sql/sedona_sql/io/raster/GeotiffSchema.scala   | 216 --------
 .../sedona_sql/io/raster/ImageReadOptions.scala    |  37 --
 .../sedona_sql/io/raster/ImageWriteOptions.scala   |  36 --
 .../scala/org/apache/sedona/sql/rasterIOTest.scala |  85 ++++
 .../org/apache/sedona/sql/rasteralgebraTest.scala  |  20 -
 ...org.apache.spark.sql.sources.DataSourceRegister |   4 +-
 .../sedona_sql/io/raster/GeotiffFileFormat.scala   | 250 ---------
 .../scala/org/apache/sedona/sql/rasterIOTest.scala | 406 ---------------
 ...org.apache.spark.sql.sources.DataSourceRegister |   4 +-
 .../sedona_sql/io/raster/GeotiffFileFormat.scala   | 250 ---------
 .../scala/org/apache/sedona/sql/rasterIOTest.scala | 406 ---------------
 22 files changed, 106 insertions(+), 2718 deletions(-)

diff --git a/R/NAMESPACE b/R/NAMESPACE
index 16194ad42..83fe86504 100644
--- a/R/NAMESPACE
+++ b/R/NAMESPACE
@@ -28,11 +28,9 @@ export(sedona_write_wkb)
 export(sedona_write_wkt)
 export(spark_read_geojson)
 export(spark_read_geoparquet)
-export(spark_read_geotiff)
 export(spark_read_shapefile)
 export(spark_write_geojson)
 export(spark_write_geoparquet)
-export(spark_write_geotiff)
 export(spark_write_raster)
 export(to_spatial_rdd)
 import(sparklyr)
diff --git a/R/R/data_interface.R b/R/R/data_interface.R
index a7cbfdfa1..09a7ea25f 100644
--- a/R/R/data_interface.R
+++ b/R/R/data_interface.R
@@ -394,7 +394,6 @@ sedona_read_shapefile <- function(sc,
 #' * `spark_read_shapefile`: from a shapefile 
 #' * `spark_read_geojson`: from a geojson file 
 #' * `spark_read_geoparquet`: from a geoparquet file 
-#' * `spark_read_geotiff`: from a GeoTiff file, or a folder containing GeoTiff 
files
 #'
 #' @inheritParams sparklyr::spark_read_source
 #'
@@ -493,29 +492,6 @@ spark_read_geoparquet <- function(sc,
 }
 
 
-#' @export
-#' @rdname spark_read_shapefile
-#' @importFrom sparklyr spark_read_source
-spark_read_geotiff <- function(sc,
-                               name = NULL,
-                               path = name,
-                               options = list(),
-                               repartition = 0,
-                               memory = TRUE,
-                               overwrite = TRUE) {
-  
-  spark_read_source(sc, 
-                    name = name,
-                    path = path,
-                    source = "geotiff",
-                    options = options,
-                    repartition = repartition,
-                    memory = memory,
-                    overwrite = overwrite,
-                    columns = NULL)
-}
-
-
 # ------- Write RDD ------------
 
 
@@ -630,8 +606,7 @@ sedona_save_spatial_rdd <- function(x,
 #' 
 #' * `spark_write_geojson`: to GeoJSON
 #' * `spark_write_geoparquet`: to GeoParquet
-#' * `spark_write_geotiff`: to GeoTiff from Array\[Double\] rasters 
-#' * `spark_write_raster`: to raster tiles after using RS output functions 
(`RS_AsXXX`) 
+#' * `spark_write_raster`: to raster tiles after using RS output functions 
(`RS_AsXXX`)
 #'
 #'
 #' @param path The path to the file. Needs to be accessible from the cluster.
@@ -716,28 +691,6 @@ spark_write_geoparquet <- function(x,
   
 }
 
-#' @export
-#' @rdname spark_write_geojson
-#' @importFrom sparklyr spark_write_source
-spark_write_geotiff <- function(x,
-                                   path,
-                                   mode = NULL,
-                                   options = list(),
-                                   partition_by = NULL,
-                                   ...) {
-  
-  spark_write_source(
-    x = x,
-    source = "geotiff",
-    mode = mode,
-    options = options,
-    partition_by = partition_by,
-    save_args = list(path),
-    ...
-  )
-  
-}
-
 
 #' @export
 #' @rdname spark_write_geojson
diff --git a/R/tests/testthat/test-data-interface-raster.R 
b/R/tests/testthat/test-data-interface-raster.R
index fef1aaf23..45d800475 100644
--- a/R/tests/testthat/test-data-interface-raster.R
+++ b/R/tests/testthat/test-data-interface-raster.R
@@ -19,565 +19,6 @@ context("data interface: raster")
 
 sc <- testthat_spark_connection()
 
-# Read geotiff ---------------
-test_that("Should Pass geotiff loading without readFromCRS and readToCRS", {
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE))
-  
-  row <- sc %>% 
-    DBI::dbGetQuery("SELECT 
-             image.geometry as Geom, 
-             image.height as height, 
-             image.width as width, 
-             image.data as data,
-             image.nBands as bands 
-             FROM ? LIMIT 1", DBI::dbQuoteIdentifier(sc, sdf_name))
-  
-  expect_equal(
-    row  %>% select(Geom, height, width, bands) %>% as.list(),
-    list(
-      Geom = "POLYGON ((-13095782 4021226.5, -13095782 3983905, -13058822 
3983905, -13058822 4021226.5, -13095782 4021226.5))",
-      height = 517,
-      width = 512,
-      bands = 1
-    )
-  )
-  
-  line1 <- row$data[[1]][1:512] 
-  line2 <- row$data[[1]][513:1024]
-  expect_equal(line1[0 + 1], 0) 
-  expect_equal(line2[159 + 1], 0)
-  expect_equal(line2[160 + 1], 123)
-  
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-  
-})
-
-test_that("Should Pass geotiff loading with readToCRS", {
-  
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE, readToCRS = "EPSG:4326"))
-  
-  row <- sc %>% 
-    DBI::dbGetQuery("SELECT 
-             image.geometry as Geom, 
-             image.height as height, 
-             image.width as width, 
-             image.data as data,
-             image.nBands as bands 
-             FROM ? LIMIT 1", DBI::dbQuoteIdentifier(sc, sdf_name))
-  
-  expect_equal(
-    row  %>% select(Geom, height, width, bands) %>% as.list(),
-    list(
-      Geom = "POLYGON ((-117.64141128097314 33.94356351407699, 
-117.64141128097314 33.664978146501284, -117.30939395196258 33.664978146501284, 
-117.30939395196258 33.94356351407699, -117.64141128097314 33.94356351407699))",
-      height = 517,
-      width = 512,
-      bands = 1
-    )
-  )
-  
-  line1 <- row$data[[1]][1:512] 
-  line2 <- row$data[[1]][513:1024]
-  expect_equal(line1[0 + 1], 0) 
-  expect_equal(line2[159 + 1], 0)
-  expect_equal(line2[160 + 1], 123)
-  
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-  
-})
-
-test_that("Should Pass geotiff loading with readFromCRS", {
-  
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE, readFromCRS = "EPSG:4499"))
-  
-  row <- sc %>% 
-    DBI::dbGetQuery("SELECT 
-             image.geometry as Geom, 
-             image.height as height, 
-             image.width as width, 
-             image.data as data,
-             image.nBands as bands 
-             FROM ? LIMIT 1", DBI::dbQuoteIdentifier(sc, sdf_name))
-  
-  expect_equal(
-    row  %>% select(Geom, height, width, bands) %>% as.list(),
-    list(
-      Geom = "POLYGON ((-13095782 4021226.5, -13095782 3983905, -13058822 
3983905, -13058822 4021226.5, -13095782 4021226.5))",
-      height = 517,
-      width = 512,
-      bands = 1
-    )
-  )
-  
-  line1 <- row$data[[1]][1:512] 
-  line2 <- row$data[[1]][513:1024]
-  expect_equal(line1[0 + 1], 0) 
-  expect_equal(line2[159 + 1], 0)
-  expect_equal(line2[160 + 1], 123)
-  
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-  
-})
-
-test_that("Should Pass geotiff loading with readFromCRS and readToCRS", {
-  
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE, readToCRS = "EPSG:4326", 
readFromCRS = "EPSG:4499"))
-  
-  row <- sc %>% 
-    DBI::dbGetQuery("SELECT 
-             image.geometry as Geom, 
-             image.height as height, 
-             image.width as width, 
-             image.data as data,
-             image.nBands as bands 
-             FROM ? LIMIT 1", DBI::dbQuoteIdentifier(sc, sdf_name))
-  
-  expect_equal(
-    row  %>% select(Geom, height, width, bands) %>% as.list(),
-    list(
-      Geom = "POLYGON ((-117.64141128097314 33.94356351407699, 
-117.64141128097314 33.664978146501284, -117.30939395196258 33.664978146501284, 
-117.30939395196258 33.94356351407699, -117.64141128097314 33.94356351407699))",
-      height = 517,
-      width = 512,
-      bands = 1
-    )
-  )
-  
-  line1 <- row$data[[1]][1:512] 
-  line2 <- row$data[[1]][513:1024]
-  expect_equal(line1[0 + 1], 0) 
-  expect_equal(line2[159 + 1], 0)
-  expect_equal(line2[160 + 1], 123)
-  
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-  
-})
-
-test_that("Should Pass geotiff loading with all read options", {
-  
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, 
-                                    options = list(dropInvalid = TRUE, 
readToCRS = "EPSG:4326", readFromCRS = "EPSG:4499", disableErrorInCRS = TRUE))
-  
-  row <- sc %>% 
-    DBI::dbGetQuery("SELECT 
-             image.geometry as Geom, 
-             image.height as height, 
-             image.width as width, 
-             image.data as data,
-             image.nBands as bands 
-             FROM ? LIMIT 1", DBI::dbQuoteIdentifier(sc, sdf_name))
-  
-  expect_equal(
-    row  %>% select(Geom, height, width, bands) %>% as.list(),
-    list(
-      Geom = "POLYGON ((-117.64141128097314 33.94356351407699, 
-117.64141128097314 33.664978146501284, -117.30939395196258 33.664978146501284, 
-117.30939395196258 33.94356351407699, -117.64141128097314 33.94356351407699))",
-      height = 517,
-      width = 512,
-      bands = 1
-    )
-  )
-  
-  line1 <- row$data[[1]][1:512] 
-  line2 <- row$data[[1]][513:1024]
-  expect_equal(line1[0 + 1], 0) 
-  expect_equal(line2[159 + 1], 0)
-  expect_equal(line2[160 + 1], 123)
-  
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-})
-
-
-# Raster functions ---------------
-test_that("should pass RS_GetBand", {
-  
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE))
-  
-  row <- sc %>% 
-    DBI::dbGetQuery("SELECT 
-            RS_GetBand(image.data, 1, image.nBands) as targetBand
-             FROM ? LIMIT 1", DBI::dbQuoteIdentifier(sc, sdf_name))
-  
-  expect_equal(
-    row$targetBand[[1]] %>% length(),
-    517 * 512
-  )
-  
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-})
-
-test_that("should pass RS_Base64", {
-  
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE))
-  
-  expect_no_error(
-    sc %>% 
-      DBI::dbGetQuery("SELECT RS_base64(height, width, targetBand, 
RS_Array(height*width, 0.0), RS_Array(height*width, 0.0)) as encodedstring
-                     FROM (
-                       SELECT RS_GetBand(image.data, 1, image.nBands) as 
targetBand, image.height as height, image.width as width
-                       FROM ?) tmp
-                     LIMIT 1", DBI::dbQuoteIdentifier(sc, sdf_name))# %>% 
print()
-  )
-  
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-})
-
-test_that("should pass RS_HTML", {
-  
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE))
-  
-  expect_no_error(
-    # row <- 
-    sc %>% 
-      DBI::dbGetQuery("SELECT RS_HTML(RS_base64(height, width, targetBand, 
RS_Array(height*width, 0.0), RS_Array(height*width, 0.0)), '300') as htmlstring
-                     FROM (
-                       SELECT RS_GetBand(image.data, 1, image.nBands) as 
targetBand, image.height as height, image.width as width
-                       FROM ?) tmp
-                     LIMIT 1", DBI::dbQuoteIdentifier(sc, sdf_name))
-  )
-  
-  
-  
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-})
-
-test_that("should pass RS_GetBand for length of Band 2", {
-  
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/test3.tif"), 
name = sdf_name, options = list(dropInvalid = TRUE))
-  
-  row <-
-    sc %>% 
-    DBI::dbGetQuery("SELECT RS_GetBand(image.data, 2, image.nBands) as 
targetBand, image.height as height, image.width as width
-                       FROM ?  LIMIT 1", DBI::dbQuoteIdentifier(sc, sdf_name))
-  expect_equal(
-    row$targetBand[[1]] %>% length(),
-    32*32
-  )
-  
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-})
-
-test_that("should pass RS_GetBand for elements of Band 2", {
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/test3.tif"), 
name = sdf_name, options = list(dropInvalid = TRUE))
-  
-  row <-
-    sc %>% 
-    DBI::dbGetQuery("SELECT RS_GetBand(image.data, 2, image.nBands) as 
targetBand, image.height as height, image.width as width
-                       FROM ?  LIMIT 1", DBI::dbQuoteIdentifier(sc, sdf_name))
-  expect_equal(
-    row$targetBand[[1]][2],
-    956.0
-  )
-  
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-})
-
-test_that("should pass RS_GetBand for elements of Band 4", {
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/test3.tif"), 
name = sdf_name, options = list(dropInvalid = TRUE))
-  
-  row <-
-    sc %>% 
-    DBI::dbGetQuery("SELECT RS_GetBand(image.data, 4, image.nBands) as 
targetBand, image.height as height, image.width as width
-                       FROM ?  LIMIT 1", DBI::dbQuoteIdentifier(sc, sdf_name))
-  expect_equal(
-    row$targetBand[[1]][3],
-    0
-  )
-  
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-})
-
-
-# Write geotiff ---------------
-test_that("Should Pass geotiff file writing with coalesce", {
-  
-  ## Load
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE, readToCRS = "EPSG:4326"))
-  
-  ## Write
-  tmp_dest <- tempfile()
-  
-  geotiff_df <- geotiff_sdf %>% spark_dataframe()
-  geotiff_df <- invoke(geotiff_df, "selectExpr", list("image.origin as 
origin","image.geometry as geometry", "image.height as height", "image.width as 
width", "image.data as data", "image.nBands as nBands"))
-  
-  geotiff_df %>% 
-    sdf_coalesce(1L) %>% 
-    spark_write_geotiff(path = tmp_dest)
-  
-  ## not clear what the issue is here
-  for (file in dir(path = tmp_dest, full.names = TRUE)) load_path <- 
path.expand(file)
-  
-  geotiff_2_sdf <- spark_read_geotiff(sc, path = load_path, options = 
list(dropInvalid = TRUE))
-  
-  row <- sc %>% 
-    DBI::dbGetQuery("SELECT 
-             image.geometry as Geom, 
-             image.height as height, 
-             image.width as width, 
-             image.data as data,
-             image.nBands as bands 
-             FROM ? LIMIT 1", DBI::dbQuoteIdentifier(sc, 
dbplyr::remote_name(geotiff_2_sdf)))
-  
-  expect_equal(
-    row  %>% select(height, width, bands) %>% as.list(),
-    list(
-      # Geom = "POLYGON ((-117.64141128097314 33.94356351407699, 
-117.64141128097314 33.664978146501284, -117.30939395196258 33.664978146501284, 
-117.30939395196258 33.94356351407699, -117.64141128097314 33.94356351407699))",
-      height = 517,
-      width = 512,
-      bands = 1
-    )
-  )
-  
-  line1 <- row$data[[1]][1:512] 
-  line2 <- row$data[[1]][513:1024]
-  expect_equal(line1[0 + 1], 0) 
-  expect_equal(line2[159 + 1], 0)
-  expect_equal(line2[160 + 1], 123)
-  
-  
-  ## Cleanup
-  unlink(tmp_dest, recursive = TRUE)
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", 
dbplyr::remote_name(geotiff_2_sdf)))
-})
-
-test_that("Should Pass geotiff file writing with writeToCRS", {
-  
-  ## Load
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE, readToCRS = "EPSG:4326"))
-  
-  ## Write
-  tmp_dest <- tempfile()
-  
-  geotiff_df <- geotiff_sdf %>% spark_dataframe()
-  geotiff_df <- invoke(geotiff_df, "selectExpr", list("image.origin as 
origin","image.geometry as geometry", "image.height as height", "image.width as 
width", "image.data as data", "image.nBands as nBands"))
-  
-  geotiff_df %>% 
-    sdf_coalesce(1L) %>% 
-    spark_write_geotiff(path = tmp_dest, options = list(writeToCRS = 
"EPSG:4499"))
-  
-  ## not clear what the issue is here
-  for (file in dir(path = tmp_dest, full.names = TRUE)) load_path <- 
path.expand(file)
-  
-  geotiff_2_sdf <- spark_read_geotiff(sc, path = load_path, options = 
list(dropInvalid = TRUE))
-  
-  row <- sc %>% 
-    DBI::dbGetQuery("SELECT 
-             image.geometry as Geom, 
-             image.height as height, 
-             image.width as width, 
-             image.data as data,
-             image.nBands as bands 
-             FROM ? LIMIT 1", DBI::dbQuoteIdentifier(sc, 
dbplyr::remote_name(geotiff_2_sdf)))
-  
-  expect_equal(
-    row  %>% select(height, width, bands) %>% as.list(),
-    list(
-      # Geom = "POLYGON ((-117.64141128097314 33.94356351407699, 
-117.64141128097314 33.664978146501284, -117.30939395196258 33.664978146501284, 
-117.30939395196258 33.94356351407699, -117.64141128097314 33.94356351407699))",
-      height = 517,
-      width = 512,
-      bands = 1
-    )
-  )
-  
-  line1 <- row$data[[1]][1:512] 
-  line2 <- row$data[[1]][513:1024]
-  expect_equal(line1[0 + 1], 0) 
-  expect_equal(line2[159 + 1], 0)
-  expect_equal(line2[160 + 1], 123)
-  
-  
-  ## Cleanup
-  unlink(tmp_dest, recursive = TRUE)
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", 
dbplyr::remote_name(geotiff_2_sdf)))
-  
-})
-
-test_that("Should Pass geotiff file writing without coalesce", {
-  ## Load
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE, readToCRS = "EPSG:4326"))
-  
-  ## Write
-  tmp_dest <- tempfile()
-  
-  geotiff_df <- geotiff_sdf %>% spark_dataframe()
-  geotiff_df <- invoke(geotiff_df, "selectExpr", list("image.origin as 
origin","image.geometry as geometry", "image.height as height", "image.width as 
width", "image.data as data", "image.nBands as nBands"))
-  geotiff_2_sdf <- geotiff_df %>% sdf_register()
-  
-  geotiff_2_sdf %>% 
-    spark_write_geotiff(path = tmp_dest)
-  
-  
-  ## Count created files
-  files <- dir(path = tmp_dest, recursive = TRUE, pattern = "tiff?$")
-  
-  expect_equal(length(files), 3)
-  
-  ## Cleanup
-  unlink(tmp_dest, recursive = TRUE)
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", 
dbplyr::remote_name(geotiff_2_sdf)))
-})
-
-test_that("Should Pass geotiff file writing with nested schema", {
-  ## Load
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE, readToCRS = "EPSG:4326"))
-  
-  ## Write
-  tmp_dest <- tempfile()
-  
-  geotiff_sdf %>% 
-    spark_write_geotiff(path = tmp_dest)
-  
-  
-  ## Count created files
-  files <- dir(path = tmp_dest, recursive = TRUE, pattern = "tiff?$")
-  
-  expect_equal(length(files), 3)
-  
-  ## Cleanup
-  unlink(tmp_dest, recursive = TRUE)
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-})
-
-
-test_that("Should Pass geotiff file writing with renamed fields", {
-  ## Load
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE, readToCRS = "EPSG:4326"))
-  
-  ## Write
-  tmp_dest <- tempfile()
-  
-  geotiff_df <- geotiff_sdf %>% spark_dataframe()
-  geotiff_df <- invoke(geotiff_df, "selectExpr", list("image.origin as 
source","image.geometry as geom", "image.height as height", "image.width as 
width", "image.data as data", "image.nBands as bands"))
-  geotiff_2_sdf <- geotiff_df %>% sdf_register()
-  
-  geotiff_2_sdf %>% 
-    spark_write_geotiff(path = tmp_dest, 
-                        mode = "overwrite",
-                        options = list(
-                          fieldOrigin   = "source",
-                          fieldGeometry = "geom",
-                          fieldNBands   = "bands"
-                        ))
-  
-  
-  ## Count created files
-  files <- dir(path = tmp_dest, recursive = TRUE, pattern = "tiff?$")
-  
-  expect_equal(length(files), 3)
-  
-  ## Cleanup
-  unlink(tmp_dest, recursive = TRUE)
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", 
dbplyr::remote_name(geotiff_2_sdf)))
-})
-
-test_that("Should Pass geotiff file writing with nested schema and renamed 
fields", {
-  ## Load
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE, readToCRS = "EPSG:4326"))
-  
-  ## Write
-  tmp_dest <- tempfile()
-  
-  geotiff_df <- geotiff_sdf %>% spark_dataframe()
-  geotiff_df <- invoke(geotiff_df, "selectExpr", list("image as tiff_image"))
-  geotiff_2_sdf <- geotiff_df %>% sdf_register()
-  
-  geotiff_2_sdf %>% 
-    spark_write_geotiff(path = tmp_dest, 
-                        mode = "overwrite",
-                        options = list(
-                          fieldImage   = "tiff_image"
-                        ))
-  
-  
-  ## Count created files
-  files <- dir(path = tmp_dest, recursive = TRUE, pattern = "tiff?$")
-  
-  expect_equal(length(files), 3)
-  
-  ## Cleanup
-  unlink(tmp_dest, recursive = TRUE)
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", 
dbplyr::remote_name(geotiff_2_sdf)))
-})
-
-test_that("Should Pass geotiff file writing with converted geometry", {
-  ## Load
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE))
-  
-  ## Write
-  tmp_dest <- tempfile()
-  
-  geotiff_df <- geotiff_sdf %>% spark_dataframe()
-  geotiff_df <- invoke(geotiff_df, "selectExpr", list("image.origin as 
source","ST_GeomFromWkt(image.geometry) as geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands"))
-  geotiff_2_sdf <- geotiff_df %>% sdf_register()
-  
-  geotiff_2_sdf %>% 
-    spark_write_geotiff(path = tmp_dest, 
-                        mode = "overwrite",
-                        options = list(
-                          fieldOrigin   = "source",
-                          fieldGeometry = "geom",
-                          fieldNBands   = "bands"
-                        ))
-  
-  
-  ## Count created files
-  files <- dir(path = tmp_dest, recursive = TRUE, pattern = "tiff?$")
-  
-  expect_equal(length(files), 3)
-  
-  ## Cleanup
-  unlink(tmp_dest, recursive = TRUE)
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", 
dbplyr::remote_name(geotiff_2_sdf)))
-})
-
-test_that("Should Pass geotiff file writing with handling invalid schema", {
-  ## Load
-  sdf_name <- random_string("spatial_sdf")
-  geotiff_sdf <- spark_read_geotiff(sc, path = test_data("raster/"), name = 
sdf_name, options = list(dropInvalid = TRUE))
-  
-  ## Write
-  tmp_dest <- tempfile()
-  
-  geotiff_df <- geotiff_sdf %>% spark_dataframe()
-  geotiff_df <- invoke(geotiff_df, "selectExpr", list("image.origin as 
origin","image.geometry as geometry", "image.height as height", "image.width as 
width", "image.data as data"))
-  geotiff_2_sdf <- geotiff_df %>% sdf_register()
-  
-  expect_error(
-    geotiff_2_sdf %>% 
-      spark_write_geotiff(path = tmp_dest, 
-                          mode = "overwrite",
-                          options = list(
-                            fieldImage   = "tiff_image"
-                          )),
-    regexp = "Invalid GeoTiff Schema"
-  )
-  
-  ## Cleanup
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", sdf_name))
-  sc %>% DBI::dbExecute(paste0("DROP TABLE ", 
dbplyr::remote_name(geotiff_2_sdf)))
-})
-
 
 # Read Binary and RS_functions  -----------------
 # Only functions related to reading
diff --git a/R/vignettes/articles/raster.Rmd b/R/vignettes/articles/raster.Rmd
index 8576e2b8f..8c6a4394b 100644
--- a/R/vignettes/articles/raster.Rmd
+++ b/R/vignettes/articles/raster.Rmd
@@ -121,69 +121,4 @@ raster %>%
                      ))
 
 dir(dest_file, recursive = TRUE)
-```
-
-
-# Using the Sedona Geotiff Dataframe Loader
-
-The Sedona Geotiff Dataframe Loader will read data from GeoTiff file (or 
folder containing multiple files) into a Spark DataFrame. The resulting data is 
a nested column. It can be unnested using SQL (results are collected)...:
-```{r}
-data_tbl <- spark_read_geotiff(sc, path = 
here::here("../core/src/test/resources/raster/"), name = "data", options = 
list(dropInvalid = TRUE))
-data_tbl
-
-## Using a direct SQL query: results are collected directly
-sc %>% 
-  DBI::dbGetQuery("SELECT 
-             image.geometry as Geom, 
-             image.height as height, 
-             image.width as width, 
-             image.nBands as bands 
-             FROM data")
-```
-
-... or using `{sparklyr.nested}` (results stay in Spark until collection):
-```{r}
-library(sparklyr.nested)
-
-data_tbl %>% sdf_schema_json(parse_json = TRUE) %>% lobstr::tree()
-
-data_tbl %>% 
-  sdf_unnest(image) %>% 
-  glimpse()
-```
-
-```{r}
-res <- 
-  data_tbl %>% 
-  sdf_unnest(image) %>% 
-  mutate(
-    mult = RS_MultiplyFactor(data, 2L)
-  ) %>% 
-  select(data, mult) %>% 
-  collect()
-
-res$data[[1]][750:760]
-res$mult[[1]][750:760]
-```
-
-
-Writing data back:
-```{r}
-dest_file <- tempfile()
-data_tbl %>% 
-  sdf_unnest(image) %>% 
-  mutate(
-    data = RS_MultiplyFactor(data, 2L)
-  ) %>% 
-  spark_write_geotiff(path = dest_file, mode = "overwrite")
-
-dir(dest_file, recursive = TRUE)
-```
-
-
-```{r include=FALSE}
-## Clean-up
-sc %>% spark_disconnect()
-spark_disconnect_all()
-rm(sc)
 ```
\ No newline at end of file
diff --git a/docs/api/sql/Raster-loader.md b/docs/api/sql/Raster-loader.md
index 129b432ff..7902a70f6 100644
--- a/docs/api/sql/Raster-loader.md
+++ b/docs/api/sql/Raster-loader.md
@@ -1,20 +1,18 @@
 !!!note
        Sedona loader are available in Scala, Java and Python and have the same 
APIs.
 
-Sedona provides two types of raster DataFrame loaders. They both use Sedona 
built-in data source but load raster images to different internal formats.
+## Load any raster to Raster format
 
-## Load any raster to RasterUDT format
-
-The raster loader of Sedona leverages Spark built-in binary data source and 
works with several RS RasterUDT constructors to produce RasterUDT type. Each 
raster is a row in the resulting DataFrame and stored in a `RasterUDT` format.
+The raster loader of Sedona leverages Spark built-in binary data source and 
works with several RS constructors to produce Raster type. Each raster is a row 
in the resulting DataFrame and stored in a `Raster` format.
 
 By default, these functions uses lon/lat order since `v1.5.0`. Before, it used 
lat/lon order.
 
 ### Load raster to a binary DataFrame
 
-You can load any type of raster data using the code below. Then use the RS 
constructors below to create RasterUDT.
+You can load any type of raster data using the code below. Then use the RS 
constructors below to create a Raster DataFrame.
 
 ```scala
-spark.read.format("binaryFile").load("/some/path/*.asc")
+sedona.read.format("binaryFile").load("/some/path/*.asc")
 ```
 
 
@@ -29,7 +27,7 @@ Since: `v1.4.0`
 Spark SQL example:
 
 ```scala
-var df = spark.read.format("binaryFile").load("/some/path/*.asc")
+var df = sedona.read.format("binaryFile").load("/some/path/*.asc")
 df = df.withColumn("raster", f.expr("RS_FromArcInfoAsciiGrid(content)"))
 ```
 
@@ -45,7 +43,7 @@ Since: `v1.4.0`
 Spark SQL example:
 
 ```scala
-var df = spark.read.format("binaryFile").load("/some/path/*.tiff")
+var df = sedona.read.format("binaryFile").load("/some/path/*.tiff")
 df = df.withColumn("raster", f.expr("RS_FromGeoTiff(content)"))
 ```
 
@@ -160,128 +158,3 @@ Output:
 |                                              GridCoverage2D["g...|
 +------------------------------------------------------------------+
 ```
-
-## Load GeoTiff to Array[Double] format
-
-!!!warning
-       This function has been deprecated since v1.4.1. Please use 
`RS_FromGeoTiff` instead and `binaryFile` data source to read GeoTiff files.
-
-The `geotiff` loader of Sedona is a Spark built-in data source. It can read a 
single geotiff image or a number of geotiff images into a DataFrame. Each 
geotiff is a row in the resulting DataFrame and stored in an array of Double 
type format.
-
-Since: `v1.1.0`
-
-Spark SQL example:
-
-The input path could be a path to a single GeoTiff image or a directory of 
GeoTiff images.
- You can optionally append an option to drop invalid images. The geometry 
bound of each image is automatically loaded
-as a Sedona geometry and is transformed to WGS84 (EPSG:4326) reference system.
-
-```scala
-var geotiffDF = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load("YOUR_PATH")
-geotiffDF.printSchema()
-```
-
-Output:
-
-```html
- |-- image: struct (nullable = true)
- |    |-- origin: string (nullable = true)
- |    |-- Geometry: string (nullable = true)
- |    |-- height: integer (nullable = true)
- |    |-- width: integer (nullable = true)
- |    |-- nBands: integer (nullable = true)
- |    |-- data: array (nullable = true)
- |    |    |-- element: double (containsNull = true)
-```
-
-There are three more optional parameters for reading GeoTiff:
-
-```html
- |-- readfromCRS: Coordinate reference system of the geometry coordinates 
representing the location of the Geotiff. An example value of readfromCRS is 
EPSG:4326.
- |-- readToCRS: If you want to transform the Geotiff location geometry 
coordinates to a different coordinate reference system, you can define the 
target coordinate reference system with this option.
- |-- disableErrorInCRS: (Default value false) => Indicates whether to ignore 
errors in CRS transformation.
-```
-
-An example with all GeoTiff read options:
-
-```scala
-var geotiffDF = sparkSession.read.format("geotiff").option("dropInvalid", 
true).option("readFromCRS", "EPSG:4499").option("readToCRS", 
"EPSG:4326").option("disableErrorInCRS", true).load("YOUR_PATH")
-geotiffDF.printSchema()
-```
-
-Output:
-
-```html
- |-- image: struct (nullable = true)
- |    |-- origin: string (nullable = true)
- |    |-- Geometry: string (nullable = true)
- |    |-- height: integer (nullable = true)
- |    |-- width: integer (nullable = true)
- |    |-- nBands: integer (nullable = true)
- |    |-- data: array (nullable = true)
- |    |    |-- element: double (containsNull = true)
-```
-
-You can also select sub-attributes individually to construct a new DataFrame
-
-```scala
-geotiffDF = geotiffDF.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-geotiffDF.createOrReplaceTempView("GeotiffDataframe")
-geotiffDF.show()
-```
-
-Output:
-
-```html
-+--------------------+--------------------+------+-----+--------------------+-----+
-|              origin|                Geom|height|width|                
data|bands|
-+--------------------+--------------------+------+-----+--------------------+-----+
-|file:///home/hp/D...|POLYGON ((-58.699...|    32|   32|[1058.0, 1039.0, ...|  
  4|
-|file:///home/hp/D...|POLYGON ((-58.297...|    32|   32|[1258.0, 1298.0, ...|  
  4|
-+--------------------+--------------------+------+-----+--------------------+-----+
-```
-
-### RS_Array
-
-Introduction: Create an array that is filled by the given value
-
-Format: `RS_Array(length:Int, value: Double)`
-
-Since: `v1.1.0`
-
-Spark SQL example:
-
-```scala
-SELECT RS_Array(height * width, 0.0)
-```
-
-### RS_GetBand
-
-Introduction: Return a particular band from Geotiff Dataframe
-
-The number of total bands can be obtained from the GeoTiff loader
-
-Format: `RS_GetBand (allBandValues: Array[Double], targetBand:Int, 
totalBands:Int)`
-
-Since: `v1.1.0`
-
-!!!note
-       Index of targetBand starts from 1 (instead of 0). Index of the first 
band is 1.
-
-Spark SQL example:
-
-```scala
-val BandDF = spark.sql("select RS_GetBand(data, 2, Band) as targetBand from 
GeotiffDataframe")
-BandDF.show()
-```
-
-Output:
-
-```html
-+--------------------+
-|          targetBand|
-+--------------------+
-|[1058.0, 1039.0, ...|
-|[1258.0, 1298.0, ...|
-+--------------------+
-```
diff --git a/docs/api/sql/Raster-operators.md b/docs/api/sql/Raster-operators.md
index f7e07770a..32b91178c 100644
--- a/docs/api/sql/Raster-operators.md
+++ b/docs/api/sql/Raster-operators.md
@@ -881,8 +881,8 @@ Array(5.0, 3.0)
 
 Spark SQL example for joining a point dataset with a raster dataset:
 ```scala
-val pointDf = spark.read...
-val rasterDf = spark.read.format("binaryFile").load("/some/path/*.tiff")
+val pointDf = sedona.read...
+val rasterDf = sedona.read.format("binaryFile").load("/some/path/*.tiff")
   .withColumn("raster", expr("RS_FromGeoTiff(content)"))
   .withColumn("envelope", expr("RS_Envelope(raster)"))
 
@@ -1034,21 +1034,18 @@ val sumDF = spark.sql("select RS_Add(band1, band2) as 
sumOfBands from dataframe"
 
 ```
 
-### RS_Append
+### RS_Array
 
-Introduction: Appends a new band to the end of Geotiff image data and returns 
the new data. The new band to be appended can be a normalized difference index 
between two bands (example: NBR, NDBI). Normalized difference index between two 
bands can be calculated with RS_NormalizedDifference operator described earlier 
in this page. Specific bands can be retrieved using RS_GetBand operator 
described [here](../Raster-loader/).
+Introduction: Create an array that is filled by the given value
 
-Format: `RS_Append(data: Array[Double], newBand: Array[Double], nBands: Int)`
+Format: `RS_Array(length:Int, value: Double)`
 
-Since: `v1.2.1`
-
-Deprecated since: `v1.4.1`
+Since: `v1.1.0`
 
 Spark SQL example:
-```scala
-
-val dfAppended = spark.sql("select RS_Append(data, normalizedDifference, 
nBands) as dataEdited from dataframe")
 
+```scala
+SELECT RS_Array(height * width, 0.0)
 ```
 
 ### RS_BitwiseAND
diff --git a/docs/api/sql/Raster-writer.md b/docs/api/sql/Raster-writer.md
index 0ddcee04d..7df229ac9 100644
--- a/docs/api/sql/Raster-writer.md
+++ b/docs/api/sql/Raster-writer.md
@@ -163,142 +163,3 @@ 
sparkSession.read.format("binaryFile").load("my_raster_file/*")
 Then you can create Raster type in Sedona like this `RS_FromGeoTiff(content)` 
(if the written data was in GeoTiff format).
 
 The newly created DataFrame can be written to disk again but must be under a 
different name such as `my_raster_file_modified`
-
-
-## Write Array[Double] to GeoTiff files
-
-!!!warning
-       This function has been deprecated since v1.4.1. Please use 
`RS_AsGeoTiff` instead and `raster` data source to write GeoTiff files.
-
-Introduction: You can write a GeoTiff dataframe as GeoTiff images using the 
spark `write` feature with the format `geotiff`. The geotiff raster column 
needs to be an array of double type data.
-
-Since: `v1.2.1`
-
-Spark SQL example:
-
-The schema of the GeoTiff dataframe to be written can be one of the following 
two schemas:
-
-```html
- |-- image: struct (nullable = true)
- |    |-- origin: string (nullable = true)
- |    |-- Geometry: geometry (nullable = true)
- |    |-- height: integer (nullable = true)
- |    |-- width: integer (nullable = true)
- |    |-- nBands: integer (nullable = true)
- |    |-- data: array (nullable = true)
- |    |    |-- element: double (containsNull = true)
-```
-
-or
-
-```html
- |-- origin: string (nullable = true)
- |-- Geometry: geometry (nullable = true)
- |-- height: integer (nullable = true)
- |-- width: integer (nullable = true)
- |-- nBands: integer (nullable = true)
- |-- data: array (nullable = true)
- |    |-- element: double (containsNull = true)
-```
-
-Field names can be renamed, but schema should exactly match with one of the 
above two schemas. The output path could be a path to a directory where GeoTiff 
images will be saved. If the directory already exists, `write` should be called 
in `overwrite` mode.
-
-```scala
-var dfToWrite = sparkSession.read.format("geotiff").option("dropInvalid", 
true).option("readToCRS", "EPSG:4326").load("PATH_TO_INPUT_GEOTIFF_IMAGES")
-dfToWrite.write.format("geotiff").save("DESTINATION_PATH")
-```
-
-You can override an existing path with the following approach:
-
-```scala
-dfToWrite.write.mode("overwrite").format("geotiff").save("DESTINATION_PATH")
-```
-
-You can also extract the columns nested within `image` column and write the 
dataframe as GeoTiff image.
-
-```scala
-dfToWrite = dfToWrite.selectExpr("image.origin as origin","image.geometry as 
geometry", "image.height as height", "image.width as width", "image.data as 
data", "image.nBands as nBands")
-dfToWrite.write.mode("overwrite").format("geotiff").save("DESTINATION_PATH")
-```
-
-If you want the saved GeoTiff images not to be distributed into multiple 
partitions, you can call coalesce to merge all files in a single partition.
-
-```scala
-dfToWrite.coalesce(1).write.mode("overwrite").format("geotiff").save("DESTINATION_PATH")
-```
-
-In case, you rename the columns of GeoTiff dataframe, you can set the 
corresponding column names with the `option` parameter. All available optional 
parameters are listed below:
-
-```html
- |-- writeToCRS: (Default value "EPSG:4326") => Coordinate reference system of 
the geometry coordinates representing the location of the Geotiff.
- |-- fieldImage: (Default value "image") => Indicates the image column of 
GeoTiff DataFrame.
- |-- fieldOrigin: (Default value "origin") => Indicates the origin column of 
GeoTiff DataFrame.
- |-- fieldNBands: (Default value "nBands") => Indicates the nBands column of 
GeoTiff DataFrame.
- |-- fieldWidth: (Default value "width") => Indicates the width column of 
GeoTiff DataFrame.
- |-- fieldHeight: (Default value "height") => Indicates the height column of 
GeoTiff DataFrame.
- |-- fieldGeometry: (Default value "geometry") => Indicates the geometry 
column of GeoTiff DataFrame.
- |-- fieldData: (Default value "data") => Indicates the data column of GeoTiff 
DataFrame.
-```
-
-An example:
-
-```scala
-dfToWrite = sparkSession.read.format("geotiff").option("dropInvalid", 
true).option("readToCRS", "EPSG:4326").load("PATH_TO_INPUT_GEOTIFF_IMAGES")
-dfToWrite = dfToWrite.selectExpr("image.origin as 
source","ST_GeomFromWkt(image.geometry) as geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-dfToWrite.write.mode("overwrite").format("geotiff").option("writeToCRS", 
"EPSG:4326").option("fieldOrigin", "source").option("fieldGeometry", 
"geom").option("fieldNBands", "bands").save("DESTINATION_PATH")
-```
-
-## Write Array[Double] to other formats
-
-### RS_Base64
-
-Introduction: Return a Base64 String from a geotiff image
-
-Format: `RS_Base64 (height:Int, width:Int, redBand: Array[Double], greenBand: 
Array[Double], blackBand: Array[Double],
-optional: alphaBand: Array[Double])`
-
-Since: `v1.1.0`
-
-Spark SQL example:
-```scala
-val BandDF = spark.sql("select RS_Base64(h, w, band1, band2, RS_Array(h*w, 0)) 
as baseString from dataframe")
-BandDF.show()
-```
-
-Output:
-
-```html
-+--------------------+
-|          baseString|
-+--------------------+
-|QJCIAAAAAABAkDwAA...|
-|QJOoAAAAAABAlEgAA...|
-+--------------------+
-```
-
-!!!note
-    Although the 3 RGB bands are mandatory, you can use [RS_Array(h*w, 
0.0)](#rs_array) to create an array (zeroed out, size = h * w) as input.
-
-
-### RS_HTML
-
-Introduction: Return a html img tag with the base64 string embedded
-
-Format: `RS_HTML(base64:String, optional: width_in_px:String)`
-
-Spark SQL example:
-
-```scala
-df.selectExpr("RS_HTML(encodedstring, '300') as htmlstring" ).show()
-```
-
-Output:
-
-```html
-+--------------------+
-|          htmlstring|
-+--------------------+
-|<img src="data:im...|
-|<img src="data:im...|
-+--------------------+
-```
diff --git 
a/sql/common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 000000000..7922c6ed8
--- /dev/null
+++ 
b/sql/common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.spark.sql.sedona_sql.io.raster.RasterFileFormat
\ No newline at end of file
diff --git a/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala 
b/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
index efa7f5b38..2b8a5049f 100644
--- a/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
+++ b/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
@@ -185,7 +185,6 @@ object Catalog {
     function[RS_BitwiseOr](),
     function[RS_CountValue](),
     function[RS_Modulo](),
-    function[RS_GetBand](),
     function[RS_SquareRoot](),
     function[RS_LogicalDifference](),
     function[RS_LogicalOver](),
@@ -193,7 +192,6 @@ object Catalog {
     function[RS_HTML](),
     function[RS_Array](),
     function[RS_Normalize](),
-    function[RS_Append](),
     function[RS_AddBandFromArray](),
     function[RS_BandAsArray](),
     function[RS_MapAlgebra](null),
diff --git 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/IO.scala
 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/IO.scala
index 1dc2d45a3..49a6ffdbc 100644
--- 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/IO.scala
+++ 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/IO.scala
@@ -21,111 +21,18 @@ package org.apache.spark.sql.sedona_sql.expressions.raster
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeArrayData}
-import org.apache.spark.sql.catalyst.expressions.ImplicitCastInputTypes
-import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ImplicitCastInputTypes, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.util.GenericArrayData
 import org.apache.spark.sql.sedona_sql.expressions.UserDataGeneratator
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
-import org.geotools.coverage.grid.io.GridFormatFinder
-import org.geotools.coverage.grid.{GridCoordinates2D, GridCoverage2D}
-import org.geotools.geometry.jts.JTS
-import org.geotools.referencing.CRS
-import org.geotools.util.factory.Hints
-import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory}
-import org.opengis.coverage.grid.GridEnvelope
-import org.opengis.referencing.crs.CoordinateReferenceSystem
-import org.opengis.referencing.operation.MathTransform
 
 import java.awt.Color
 import java.awt.image.BufferedImage
-import java.io.{ByteArrayOutputStream, IOException}
+import java.io.ByteArrayOutputStream
 import java.util.Base64
 import javax.imageio.ImageIO
 
-class GeometryOperations {
-
-  var coverage:GridCoverage2D = null
-  var source:CoordinateReferenceSystem = null
-  var target:CoordinateReferenceSystem = null
-  var targetCRS:MathTransform =  null
-
-  def getDimensions(url:String):GridEnvelope = {
-    val format = GridFormatFinder.findFormat(url)
-    val hints = new Hints(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER, true)
-    val reader = format.getReader(url, hints)
-
-
-    try coverage = reader.read(null)
-    catch {
-      case giveUp: IOException =>
-        throw new RuntimeException(giveUp)
-    }
-    reader.dispose()
-    source = coverage.getCoordinateReferenceSystem
-    target = CRS.decode("EPSG:4326", true)
-    targetCRS = CRS.findMathTransform(source, target)
-    val gridRange2D = coverage.getGridGeometry.getGridRange
-    gridRange2D
-
-  }
-   def readGeometry(url: String): Geometry = {
-    val gridRange2D = getDimensions(url)
-    val cords = Array(Array(gridRange2D.getLow(0), gridRange2D.getLow(1)), 
Array(gridRange2D.getLow(0), gridRange2D.getHigh(1)), 
Array(gridRange2D.getHigh(0), gridRange2D.getHigh(1)), 
Array(gridRange2D.getHigh(0), gridRange2D.getLow(1)))
-    val polyCoordinates = new Array[Coordinate](5)
-    var index = 0
-
-    for (point <- cords) {
-      val coordinate2D = new GridCoordinates2D(point(0), point(1))
-      val result = coverage.getGridGeometry.gridToWorld(coordinate2D)
-      polyCoordinates({
-        index += 1; index - 1
-      }) = new Coordinate(result.getOrdinate(0), result.getOrdinate(1))
-    }
-
-    polyCoordinates(index) = polyCoordinates(0)
-    val factory = new GeometryFactory
-    val polygon = JTS.transform(factory.createPolygon(polyCoordinates), 
targetCRS)
-
-    polygon
-
-  }
-}
-
-// get a particular band from a results of ST_GeomWithBandsFromGeoTiff
-case class RS_GetBand(inputExpressions: Seq[Expression])
-  extends Expression with CodegenFallback with UserDataGeneratator {
-  override def nullable: Boolean = false
-
-  override def eval(inputRow: InternalRow): Any = {
-    // This is an expression which takes one input expressions
-    assert(inputExpressions.length == 3)
-    val bandInfo 
=inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
-    val targetBand = inputExpressions(1).eval(inputRow).asInstanceOf[Int]
-    val totalBands = inputExpressions(2).eval(inputRow).asInstanceOf[Int]
-    val result = gettargetband(bandInfo, targetBand, totalBands)
-    new GenericArrayData(result)
-  }
-
-  // fetch target band from the given array of bands
-  private def gettargetband(bandinfo: Array[Double], targetband:Int, 
totalbands:Int): Array[Double] = {
-    val sizeOfBand = bandinfo.length/totalbands
-    val lowerBound = (targetband - 1)*sizeOfBand
-    val upperBound = targetband*sizeOfBand
-    assert(bandinfo.slice(lowerBound,upperBound).length == sizeOfBand)
-    bandinfo.slice(lowerBound, upperBound)
-
-  }
-
-  override def dataType: DataType = ArrayType(DoubleType)
-
-  override def children: Seq[Expression] = inputExpressions
-
-  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
-    copy(inputExpressions = newChildren)
-  }
-}
-
 case class RS_Array(inputExpressions: Seq[Expression])
   extends Expression with ImplicitCastInputTypes with CodegenFallback with 
UserDataGeneratator {
   override def nullable: Boolean = false
diff --git 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/MapAlgebra.scala
 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/MapAlgebra.scala
index 9734f9255..38960fa0e 100644
--- 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/MapAlgebra.scala
+++ 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/MapAlgebra.scala
@@ -27,8 +27,6 @@ import 
org.apache.spark.sql.sedona_sql.expressions.InferrableFunctionConverter._
 import org.apache.spark.sql.sedona_sql.expressions.{InferredExpression, 
UserDataGeneratator}
 import org.apache.spark.sql.types._
 
-import javax.media.jai.RasterFactory
-
 /// Calculate Normalized Difference between two bands
 case class RS_NormalizedDifference(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback with UserDataGeneratator {
@@ -772,40 +770,6 @@ case class RS_Normalize(inputExpressions: Seq[Expression])
   }
 }
 
-
-/// Appends a new band to the image array data
-case class RS_Append(inputExpressions: Seq[Expression])
-  extends Expression with CodegenFallback with UserDataGeneratator {
-  // This is an expression which takes three input expressions
-  assert(inputExpressions.length == 3)
-
-  override def nullable: Boolean = false
-
-  override def eval(inputRow: InternalRow): Any = {
-    val data = 
inputExpressions(0).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
-    val newBand = 
inputExpressions(1).eval(inputRow).asInstanceOf[ArrayData].toDoubleArray()
-    val nBands = inputExpressions(2).eval(inputRow).asInstanceOf[Int]
-
-    val appendedData = append(data, newBand, nBands)
-    new GenericArrayData(appendedData)
-  }
-  private def append(data: Array[Double], newBand: Array[Double], nBands: 
Int): Array[Double] = {
-    val bandLength = data.length/nBands
-    assert(newBand.length == bandLength)
-
-    // concat newBand to the end of data and return concatenated result
-    data ++ newBand
-  }
-
-  override def dataType: DataType = ArrayType(DoubleType)
-
-  override def children: Seq[Expression] = inputExpressions
-
-  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
-    copy(inputExpressions = newChildren)
-  }
-}
-
 case class RS_AddBandFromArray(inputExpressions: Seq[Expression])
   extends InferredExpression(inferrableFunction3(MapAlgebra.addBandFromArray), 
inferrableFunction4(MapAlgebra.addBandFromArray), 
inferrableFunction2(MapAlgebra.addBandFromArray)) {
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
diff --git 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffSchema.scala
 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffSchema.scala
deleted file mode 100644
index 90c0ec55c..000000000
--- 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffSchema.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.sedona_sql.io.raster
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
-import org.apache.spark.sql.types._
-import org.geotools.coverage.grid.io.{AbstractGridFormat, 
GridCoverage2DReader, OverviewPolicy}
-import org.geotools.coverage.grid.{GridCoordinates2D, GridCoverage2D}
-import org.geotools.gce.geotiff.GeoTiffReader
-import org.geotools.geometry.jts.JTS
-import org.geotools.referencing.CRS
-import org.locationtech.jts.geom.{Coordinate, GeometryFactory, Polygon}
-import org.opengis.coverage.grid.{GridCoordinates, GridEnvelope}
-import org.opengis.parameter.{GeneralParameterValue, ParameterValue}
-import org.opengis.referencing.crs.CoordinateReferenceSystem
-import org.opengis.referencing.operation.MathTransform
-
-import java.io.ByteArrayInputStream
-
-object GeotiffSchema {
-  val undefinedImageType = "Undefined"
-
-  /**
-    * Schema for the image column: Row(String,Geometry, Int, Int, Int, 
Array[Double])
-    */
-  val columnSchema = StructType(
-    StructField("origin", StringType, true) ::
-      StructField("geometry", StringType, true) ::
-      StructField("height", IntegerType, false) ::
-      StructField("width", IntegerType, false) ::
-      StructField("nBands", IntegerType, false) ::
-      StructField("data", ArrayType(DoubleType), false) :: Nil)
-
-  val imageFields: Array[String] = columnSchema.fieldNames
-
-  /**
-    * DataFrame with a single column of images named "image" (nullable)
-    */
-  val imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)
-
-  /**
-    * Gets the origin of the image
-    *
-    * @return The origin of the image
-    */
-  def getOrigin(row: Row): String = row.getString(0)
-
-  /**
-    * Gets the origin of the image
-    *
-    * @return The origin of the image
-    */
-  def getGeometry(row: Row): GeometryUDT = row.getAs[GeometryUDT](1)
-
-
-  /**
-    * Gets the height of the image
-    *
-    * @return The height of the image
-    */
-  def getHeight(row: Row): Int = row.getInt(2)
-
-  /**
-    * Gets the width of the image
-    *
-    * @return The width of the image
-    */
-  def getWidth(row: Row): Int = row.getInt(3)
-
-  /**
-    * Gets the number of channels in the image
-    *
-    * @return The number of bands in the image
-    */
-  def getNBands(row: Row): Int = row.getInt(4)
-
-
-  /**
-    * Gets the image data
-    *
-    * @return The image data
-    */
-  def getData(row: Row): Array[Double] = row.getAs[Array[Double]](5)
-
-  /**
-    * Default values for the invalid image
-    *
-    * @param origin Origin of the invalid image
-    * @return Row with the default values
-    */
-  private[io] def invalidImageRow(origin: String): Row =
-    Row(Row(origin, -1, -1, -1, Array.ofDim[Byte](0)))
-
-  /**
-    *
-    * Convert a GeoTiff image into a dataframe row
-    *
-    * @param origin Arbitrary string that identifies the image
-    * @param bytes  Image bytes (for example, jpeg)
-    * @return DataFrame Row or None (if the decompression fails)
-    *
-    */
-
-  private[io] def decode(origin: String, bytes: Array[Byte], 
imageSourceOptions: ImageReadOptions): Option[Row] = {
-
-    val policy: ParameterValue[OverviewPolicy] = 
AbstractGridFormat.OVERVIEW_POLICY.createValue
-    policy.setValue(OverviewPolicy.IGNORE)
-    val gridsize: ParameterValue[String] = 
AbstractGridFormat.SUGGESTED_TILE_SIZE.createValue
-    val useJaiRead: ParameterValue[Boolean] = 
AbstractGridFormat.USE_JAI_IMAGEREAD.createValue.asInstanceOf[ParameterValue[Boolean]]
-    useJaiRead.setValue(true)
-
-    // Read Geotiff image from Byte Array
-    val reader: GridCoverage2DReader = try {
-      new GeoTiffReader(new ByteArrayInputStream(bytes))
-    } catch {
-      // Catch runtime exception because `ImageIO` may throw unexpected 
`RuntimeException`.
-      // But do not catch the declared `IOException` (regarded as FileSystem 
failure)
-      case _: RuntimeException => null
-    }
-    var coverage: GridCoverage2D = null
-    if (reader == null) {
-      None
-    } else {
-      coverage = reader.read(Array[GeneralParameterValue](policy, gridsize, 
useJaiRead))
-    }
-
-    // Fetch geometry from given image
-    var source: CoordinateReferenceSystem = try {
-      coverage.getCoordinateReferenceSystem
-    }
-    catch {
-      case _: Exception => null
-    }
-    if (source == null && imageSourceOptions.readFromCRS != "") {
-      source = CRS.decode(imageSourceOptions.readFromCRS, true)
-    }
-
-    val target: CoordinateReferenceSystem = if (imageSourceOptions.readToCRS 
!= "") {
-      CRS.decode(imageSourceOptions.readToCRS, true)
-    } else {
-      null
-    }
-
-    var targetCRS: MathTransform = null
-    if (target != null) {
-      if (source == null) {
-        throw new IllegalArgumentException("Invalid arguments. Source 
coordinate reference system was not found.")
-      } else {
-        targetCRS = CRS.findMathTransform(source, target, 
imageSourceOptions.disableErrorInCRS)
-      }
-    }
-
-    val gridRange2D = coverage.getGridGeometry.getGridRange
-    val cords = Array(Array(gridRange2D.getLow(0), gridRange2D.getLow(1)), 
Array(gridRange2D.getLow(0), gridRange2D.getHigh(1)), 
Array(gridRange2D.getHigh(0), gridRange2D.getHigh(1)), 
Array(gridRange2D.getHigh(0), gridRange2D.getLow(1)))
-    val polyCoordinates = new Array[Coordinate](5)
-    var index = 0
-
-    for (point <- cords) {
-      val coordinate2D = new GridCoordinates2D(point(0), point(1))
-      val result = coverage.getGridGeometry.gridToWorld(coordinate2D)
-      polyCoordinates({
-        index += 1;
-        index - 1
-      }) = new Coordinate(result.getOrdinate(0), result.getOrdinate(1))
-    }
-
-    polyCoordinates(index) = polyCoordinates(0)
-    val factory = new GeometryFactory
-    var polygon = factory.createPolygon(polyCoordinates)
-    if (targetCRS != null) {
-      polygon = JTS.transform(polygon, targetCRS).asInstanceOf[Polygon]
-    }
-
-    // Fetch band values from given image
-    val nBands: Int = coverage.getNumSampleDimensions
-    val dimensions: GridEnvelope = reader.getOriginalGridRange
-    val maxDimensions: GridCoordinates = dimensions.getHigh
-    val width: Int = maxDimensions.getCoordinateValue(0) + 1
-    val height: Int = maxDimensions.getCoordinateValue(1) + 1
-    val imageSize = height * width * nBands
-    assert(imageSize < 1e9, "image is too large")
-    val decoded = Array.ofDim[Double](imageSize)
-
-    for (i <- 0 until height) {
-      for (j <- 0 until width) {
-        val vals: Array[Double] = new Array[Double](nBands)
-        coverage.evaluate(new GridCoordinates2D(j, i), vals)
-        // bands of a pixel will be put in [b0...b1...b2...]
-        // Each "..." represent w * h pixels
-        for (bandId <- 0 until nBands) {
-          val offset = i * width + j + width * height * bandId
-          decoded(offset) = vals(bandId)
-        }
-      }
-    }
-    // the internal "Row" is needed, because the image is a single DataFrame 
column
-    Some(Row(Row(origin, polygon.toText, height, width, nBands, decoded)))
-  }
-}
diff --git 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/ImageReadOptions.scala
 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/ImageReadOptions.scala
deleted file mode 100644
index 552b8f8e8..000000000
--- 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/ImageReadOptions.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.sedona_sql.io.raster
-
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-
-private[io] class ImageReadOptions(@transient private val parameters: 
CaseInsensitiveMap[String]) extends Serializable {
-  def this(parameters: Map[String, String]) = 
this(CaseInsensitiveMap(parameters))
-  /**
-   * Optional parameters for reading GeoTiff
-   * dropInvalid indicatesWhether to drop invalid images. If true, invalid 
images will be removed, otherwise
-   * invalid images will be returned with empty data and all other field 
filled with `-1`.
-   * disableErrorInCRS indicates whether to disable to errors in CRS 
transformation
-   * readFromCRS and readToCRS indicate source and target coordinate reference 
system, respectively.
-   */
-  val dropInvalid = parameters.getOrElse("dropInvalid", "false").toBoolean
-  val disableErrorInCRS = parameters.getOrElse("disableErrorInCRS", 
"false").toBoolean
-  val readFromCRS = parameters.getOrElse("readFromCRS", "")
-  val readToCRS = parameters.getOrElse("readToCRS", "")
-
-}
\ No newline at end of file
diff --git 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/ImageWriteOptions.scala
 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/ImageWriteOptions.scala
deleted file mode 100644
index 6a730faa3..000000000
--- 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/ImageWriteOptions.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.sedona_sql.io.raster
-
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-
-private[io] class ImageWriteOptions(@transient private val parameters: 
CaseInsensitiveMap[String]) extends Serializable {
-  def this(parameters: Map[String, String]) = 
this(CaseInsensitiveMap(parameters))
-
-  // Optional parameters for writing GeoTiff
-  val writeToCRS = parameters.getOrElse("writeToCRS", "EPSG:4326")
-  val colImage = parameters.getOrElse("fieldImage", "image")
-  val colOrigin = parameters.getOrElse("fieldOrigin", "origin")
-  val colBands = parameters.getOrElse("fieldNBands", "nBands")
-  val colWidth = parameters.getOrElse("fieldWidth", "width")
-  val colHeight = parameters.getOrElse("fieldHeight", "height")
-  val colGeometry = parameters.getOrElse("fieldGeometry", "geometry")
-  val colData = parameters.getOrElse("fieldData", "data")
-
-}
\ No newline at end of file
diff --git a/sql/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala 
b/sql/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
new file mode 100644
index 000000000..2c219d801
--- /dev/null
+++ b/sql/common/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SaveMode
+import org.scalatest.{BeforeAndAfter, GivenWhenThen}
+
+import java.io.File
+import java.nio.file.Files
+
+class rasterIOTest extends TestBaseScala with BeforeAndAfter with 
GivenWhenThen {
+
+  var rasterdatalocation: String = resourceFolder + "raster/"
+  val tempDir: String = 
Files.createTempDirectory("sedona_raster_io_test_").toFile.getAbsolutePath
+
+  describe("Raster IO test") {
+    it("should read geotiff using binary source and write geotiff back to disk 
using raster source") {
+      var rasterDf = 
sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(tempDir + 
"/raster-written")
+      rasterDf = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
+      rasterDf = rasterDf.selectExpr("RS_FromGeoTiff(content)")
+      assert(rasterDf.count() == rasterCount)
+    }
+
+    it("should read and write geotiff using given options") {
+      var rasterDf = 
sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").option("rasterField", 
"content").option("fileExtension", ".tiff").option("pathField", 
"path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      rasterDf = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
+      rasterDf = rasterDf.selectExpr("RS_FromGeoTiff(content)")
+      assert(rasterDf.count() == rasterCount)
+    }
+
+    it("should read and write via RS_FromGeoTiff and RS_AsGeoTiff") {
+      var df = sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      var rasterDf = df.selectExpr("RS_FromGeoTiff(content) as raster", 
"path").selectExpr("RS_AsGeoTiff(raster) as content", "path")
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").option("rasterField", 
"content").option("fileExtension", ".tiff").option("pathField", 
"path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      df = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
+      rasterDf = df.selectExpr("RS_FromGeoTiff(content)")
+      assert(rasterDf.count() == rasterCount)
+    }
+
+    it("should handle null") {
+      var df = sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      var rasterDf = df.selectExpr("RS_FromGeoTiff(null) as raster", 
"length").selectExpr("RS_AsGeoTiff(raster) as content", "length")
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(tempDir + 
"/raster-written")
+      df = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
+      rasterDf = df.selectExpr("RS_FromGeoTiff(content)")
+      assert(rasterCount == 3)
+      assert(rasterDf.count() == 0)
+    }
+
+    it("should read RS_FromGeoTiff and write RS_AsArcGrid") {
+      var df = sparkSession.read.format("binaryFile").load(resourceFolder + 
"raster_geotiff_color/*")
+      var rasterDf = df.selectExpr("RS_FromGeoTiff(content) as raster", 
"path").selectExpr("RS_AsArcGrid(raster, 1) as content", "path")
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").option("rasterField", 
"content").option("fileExtension", ".asc").option("pathField", 
"path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      df = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
+      rasterDf = df.selectExpr("RS_FromArcInfoAsciiGrid(content)")
+      assert(rasterDf.count() == rasterCount)
+    }
+  }
+
+  override def afterAll(): Unit = FileUtils.deleteDirectory(new File(tempDir))
+}
\ No newline at end of file
diff --git 
a/sql/common/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala 
b/sql/common/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
index 0b730d8db..0f1ba15c8 100644
--- a/sql/common/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
+++ b/sql/common/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
@@ -220,26 +220,6 @@ class rasteralgebraTest extends TestBaseScala with 
BeforeAndAfter with GivenWhen
     }
   }
 
-  describe("Should pass all transformation tests") {
-    it("Passed RS_Append for new data length and new band elements") {
-      var df = Seq(Seq(200.0, 400.0, 600.0, 800.0, 100.0, 500.0, 800.0, 
600.0)).toDF("data")
-      df = df.selectExpr("data", "2 as nBands")
-      var rowFirst = df.first()
-      val nBands = rowFirst.getAs[Int](1)
-      val lengthInitial = 
rowFirst.getAs[mutable.WrappedArray[Double]](0).length
-      val lengthBand = lengthInitial / nBands
-
-      df = df.selectExpr("data", "nBands", "RS_GetBand(data, 1, nBands) as 
band1", "RS_GetBand(data, 2, nBands) as band2")
-      df = df.selectExpr("data", "nBands", "RS_NormalizedDifference(band2, 
band1) as normalizedDifference")
-      df = df.selectExpr("RS_Append(data, normalizedDifference, nBands) as 
targetData")
-
-      rowFirst = df.first()
-      assert(rowFirst.getAs[mutable.WrappedArray[Double]](0).length == 
lengthInitial + lengthBand)
-      assert((rowFirst.getAs[mutable.WrappedArray[Double]](0)(lengthInitial) 
== 0.33) &&
-        (rowFirst.getAs[mutable.WrappedArray[Double]](0)(lengthInitial + 
lengthBand - 1) == 0.14))
-    }
-  }
-
   describe("Should pass all raster function tests") {
 
     it("Passed RS_FromGeoTiff should handle null values") {
diff --git 
a/sql/spark-3.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/spark-3.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 4352e8182..8ca039262 100644
--- 
a/sql/spark-3.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/spark-3.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,3 +1 @@
-org.apache.spark.sql.sedona_sql.io.raster.GeotiffFileFormat
-org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat
-org.apache.spark.sql.sedona_sql.io.raster.RasterFileFormat
\ No newline at end of file
+org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat
\ No newline at end of file
diff --git 
a/sql/spark-3.0/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffFileFormat.scala
 
b/sql/spark-3.0/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffFileFormat.scala
deleted file mode 100644
index f3360ae3d..000000000
--- 
a/sql/spark-3.0/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffFileFormat.scala
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package org.apache.spark.sql.sedona_sql.io.raster
-
-import com.google.common.io.{ByteStreams, Closeables}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
-import org.apache.sedona.sql.utils.GeometrySerializer
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, 
OutputWriterFactory, PartitionedFile}
-import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.SerializableConfiguration
-import org.geotools.coverage.CoverageFactoryFinder
-import org.geotools.coverage.grid.io.{AbstractGridFormat, GridFormatFinder}
-import org.geotools.gce.geotiff.{GeoTiffFormat, GeoTiffWriteParams, 
GeoTiffWriter}
-import org.geotools.geometry.jts.ReferencedEnvelope
-import org.geotools.referencing.CRS
-import org.geotools.util.factory.Hints
-import org.locationtech.jts.geom.{Coordinate, Polygon}
-import org.locationtech.jts.io.WKTReader
-import org.opengis.parameter.GeneralParameterValue
-
-import java.awt.image.DataBuffer
-import java.io.IOException
-import java.nio.file.Paths
-import javax.imageio.ImageWriteParam
-import javax.media.jai.RasterFactory
-
-private[spark] class GeotiffFileFormat extends FileFormat with 
DataSourceRegister {
-
-  override def inferSchema(
-                            sparkSession: SparkSession,
-                            options: Map[String, String],
-                            files: Seq[FileStatus]): Option[StructType] = 
Some(GeotiffSchema.imageSchema)
-
-  override def prepareWrite(
-                             sparkSession: SparkSession,
-                             job: Job,
-                             options: Map[String, String],
-                             dataSchema: StructType): OutputWriterFactory = {
-    val imageWriteOptions = new ImageWriteOptions(options)
-    if (!isValidGeoTiffSchema(imageWriteOptions, dataSchema)) {
-      throw new IllegalArgumentException("Invalid GeoTiff Schema")
-    }
-
-    new OutputWriterFactory {
-      override def getFileExtension(context: TaskAttemptContext): String = ""
-
-      override def newInstance(path: String, dataSchema: StructType, context: 
TaskAttemptContext): OutputWriter = {
-        new GeotiffFileWriter(path, imageWriteOptions, dataSchema, context)
-      }
-    }
-  }
-
-  override def shortName(): String = "geotiff"
-
-  override protected def buildReader(
-                                      sparkSession: SparkSession,
-                                      dataSchema: StructType,
-                                      partitionSchema: StructType,
-                                      requiredSchema: StructType,
-                                      filters: Seq[Filter],
-                                      options: Map[String, String],
-                                      hadoopConf: Configuration): 
(PartitionedFile) => Iterator[InternalRow] = {
-    assert(
-      requiredSchema.length <= 1,
-      "Image data source only produces a single data column named \"image\".")
-
-    val broadcastedHadoopConf =
-      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
-
-    val imageSourceOptions = new ImageReadOptions(options)
-
-    (file: PartitionedFile) => {
-      val emptyUnsafeRow = new UnsafeRow(0)
-      if (!imageSourceOptions.dropInvalid && requiredSchema.isEmpty) {
-        Iterator(emptyUnsafeRow)
-      } else {
-        val origin = file.filePath
-        val path = new Path(origin)
-        val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
-        val stream = fs.open(path)
-        val bytes = try {
-          ByteStreams.toByteArray(stream)
-        } finally {
-          Closeables.close(stream, true)
-        }
-
-        val resultOpt = GeotiffSchema.decode(origin, bytes, imageSourceOptions)
-        val filteredResult = if (imageSourceOptions.dropInvalid) {
-          resultOpt.toIterator
-        } else {
-          Iterator(resultOpt.getOrElse(GeotiffSchema.invalidImageRow(origin)))
-        }
-
-        if (requiredSchema.isEmpty) {
-          filteredResult.map(_ => emptyUnsafeRow)
-        } else {
-          val converter = RowEncoder(requiredSchema).createSerializer() // 
SPARK3 anchor
-           filteredResult.map(row => converter(row)) // SPARK3 anchor
-//          val converter = RowEncoder(requiredSchema) // SPARK2 anchor
-//          filteredResult.map(row => converter.toRow(row)) // SPARK2 anchor
-        }
-      }
-    }
-  }
-
-  private def isValidGeoTiffSchema(imageWriteOptions: ImageWriteOptions, 
dataSchema: StructType): Boolean = {
-    val fields = dataSchema.fieldNames
-    if (fields.contains(imageWriteOptions.colImage) ){
-      val schemaFields = 
dataSchema.fields(dataSchema.fieldIndex(imageWriteOptions.colImage)).dataType.asInstanceOf[StructType]
-      if (schemaFields.fieldNames.length != 6) return false
-    }
-    else {
-      if (fields.length != 6) return false
-    }
-    true
-  }
-
-}
-
-// class for writing geoTiff images
-private class GeotiffFileWriter(savePath: String,
-                                imageWriteOptions: ImageWriteOptions,
-                                dataSchema: StructType,
-                                context: TaskAttemptContext) extends 
OutputWriter {
-
-  // set writing parameters
-  private val DEFAULT_WRITE_PARAMS: GeoTiffWriteParams = new 
GeoTiffWriteParams()
-  DEFAULT_WRITE_PARAMS.setCompressionMode(ImageWriteParam.MODE_EXPLICIT)
-  DEFAULT_WRITE_PARAMS.setCompressionType("LZW")
-  DEFAULT_WRITE_PARAMS.setCompressionQuality(0.75F)
-  DEFAULT_WRITE_PARAMS.setTilingMode(ImageWriteParam.MODE_EXPLICIT)
-  DEFAULT_WRITE_PARAMS.setTiling(512, 512)
-
-  private val hfs = new Path(savePath).getFileSystem(context.getConfiguration)
-
-  override def write(row: InternalRow): Unit = {
-    // retrieving the metadata of a geotiff image
-    var rowFields: InternalRow = row
-    var schemaFields: StructType = dataSchema
-    val fields = dataSchema.fieldNames
-
-    if (fields.contains(imageWriteOptions.colImage)) {
-      schemaFields = 
dataSchema.fields(dataSchema.fieldIndex(imageWriteOptions.colImage)).dataType.asInstanceOf[StructType]
-      rowFields = 
row.getStruct(dataSchema.fieldIndex(imageWriteOptions.colImage), 6)
-    }
-
-    val tiffOrigin = 
rowFields.getString(schemaFields.fieldIndex(imageWriteOptions.colOrigin))
-    val tiffBands = 
rowFields.getInt(schemaFields.fieldIndex(imageWriteOptions.colBands))
-    val tiffWidth = 
rowFields.getInt(schemaFields.fieldIndex(imageWriteOptions.colWidth))
-    val tiffHeight = 
rowFields.getInt(schemaFields.fieldIndex(imageWriteOptions.colHeight))
-    val tiffGeometry = 
Row.fromSeq(rowFields.toSeq(schemaFields)).get(schemaFields.fieldIndex(imageWriteOptions.colGeometry))
-    val tiffData = 
rowFields.getArray(schemaFields.fieldIndex(imageWriteOptions.colData)).toDoubleArray()
-
-    // if an image is invalid, fields are -1 and data array is empty. Skip 
writing that image
-    if (tiffBands == -1) return
-
-    // create a writable raster object
-    val raster = RasterFactory.createBandedRaster(DataBuffer.TYPE_DOUBLE, 
tiffWidth, tiffHeight, tiffBands, null)
-
-    // extract the pixels of the geotiff image and write to the writable raster
-    val pixelVal = Array.ofDim[Double](tiffBands)
-    for (i <- 0 until tiffHeight) {
-      for (j <- 0 until tiffWidth) {
-        for (k <- 0 until tiffBands) {
-          pixelVal(k) = tiffData(tiffHeight*tiffWidth*k + i * tiffWidth + j)
-        }
-        raster.setPixel(j, i, pixelVal)
-      }
-    }
-
-    // CRS is decoded to user-provided option "writeToCRS", default value is 
"EPSG:4326"
-    val crs = CRS.decode(imageWriteOptions.writeToCRS, true)
-
-    // Extract the geometry coordinates and set the envelop of the geotiff 
source
-    var coordinateList: Array[Coordinate] = null
-    if (tiffGeometry.isInstanceOf[UTF8String]) {
-      val wktReader = new WKTReader()
-      val envGeom = wktReader.read(tiffGeometry.toString).asInstanceOf[Polygon]
-      coordinateList = envGeom.getCoordinates()
-    } else {
-      val envGeom = 
GeometrySerializer.deserialize(tiffGeometry.asInstanceOf[Array[Byte]])
-      coordinateList = envGeom.getCoordinates()
-    }
-    val referencedEnvelope = new ReferencedEnvelope(coordinateList(0).x, 
coordinateList(2).x, coordinateList(0).y, coordinateList(2).y, crs)
-
-    // create the write path
-    val writePath = Paths.get(savePath, new Path(tiffOrigin).getName).toString
-    val out = hfs.create(new Path(writePath))
-
-    val format = GridFormatFinder.findFormat(out)
-    var hints: Hints = null
-    if (format.isInstanceOf[GeoTiffFormat]) {
-      hints = new Hints(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER, true)
-    }
-
-    // create the writer object
-    val factory = CoverageFactoryFinder.getGridCoverageFactory(hints)
-    val gc = factory.create("GRID", raster, referencedEnvelope)
-    val writer = new GeoTiffWriter(out, hints)
-
-    val gtiffParams = new GeoTiffFormat().getWriteParameters
-    
gtiffParams.parameter(AbstractGridFormat.GEOTOOLS_WRITE_PARAMS.getName.toString).setValue(DEFAULT_WRITE_PARAMS)
-    val wps: Array[GeneralParameterValue] = gtiffParams.values.toArray(new 
Array[GeneralParameterValue](1))
-
-    // write the geotiff image to file
-    try {
-      writer.write(gc, wps)
-      writer.dispose()
-      out.close()
-    } catch {
-      case e@(_: IllegalArgumentException | _: IOException) =>
-        // TODO Auto-generated catch block
-        e.printStackTrace()
-    }
-  }
-
-  override def close(): Unit = {
-    hfs.close()
-  }
-
-  def path(): String = {
-    savePath
-  }
-}
diff --git 
a/sql/spark-3.0/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala 
b/sql/spark-3.0/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
deleted file mode 100644
index 8a6b56ec0..000000000
--- a/sql/spark-3.0/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sedona.sql
-
-import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.SaveMode
-import org.locationtech.jts.geom.Geometry
-import org.scalatest.{BeforeAndAfter, GivenWhenThen}
-
-import java.io.File
-import java.nio.file.Files
-import scala.collection.mutable
-
-class rasterIOTest extends TestBaseScala with BeforeAndAfter with 
GivenWhenThen {
-
-  var rasterdatalocation: String = resourceFolder + "raster/"
-  val tempDir: String = 
Files.createTempDirectory("sedona_raster_io_test_").toFile.getAbsolutePath
-
-  describe("Raster IO test") {
-    it("Should Pass geotiff loading without readFromCRS and readToCRS") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      assert(df.first().getAs[Geometry](1).toText == "POLYGON ((-13095782 
4021226.5, -13095782 3983905, -13058822 3983905, -13058822 4021226.5, -13095782 
4021226.5))")
-      assert(df.first().getInt(2) == 517)
-      assert(df.first().getInt(3) == 512)
-      assert(df.first().getInt(5) == 1)
-      val blackBand = df.first().getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("Should Pass geotiff loading with readToCRS") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).option("readToCRS", "EPSG:4326").load(rasterdatalocation)
-      df = df.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      assert(df.first().getAs[Geometry](1).toText == "POLYGON 
((-117.64141128097314 33.94356351407699, -117.64141128097314 
33.664978146501284, -117.30939395196258 33.664978146501284," +
-        " -117.30939395196258 33.94356351407699, -117.64141128097314 
33.94356351407699))")
-      assert(df.first().getInt(2) == 517)
-      assert(df.first().getInt(3) == 512)
-      assert(df.first().getInt(5) == 1)
-      val blackBand = df.first().getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("Should Pass geotiff loading with readFromCRS") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).option("readFromCRS", "EPSG:4499").load(rasterdatalocation)
-      df = df.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      assert(df.first().getAs[Geometry](1).toText == "POLYGON ((-13095782 
4021226.5, -13095782 3983905, -13058822 3983905, -13058822 4021226.5, -13095782 
4021226.5))")
-      assert(df.first().getInt(2) == 517)
-      assert(df.first().getInt(3) == 512)
-      assert(df.first().getInt(5) == 1)
-      val blackBand = df.first().getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("Should Pass geotiff loading with readFromCRS and readToCRS") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).option("readFromCRS", "EPSG:4499").option("readToCRS", 
"EPSG:4326").load(rasterdatalocation)
-      df = df.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      assert(df.first().getAs[Geometry](1).toText == "POLYGON 
((-117.64141128097314 33.94356351407699, -117.64141128097314 
33.664978146501284, -117.30939395196258 33.664978146501284," +
-        " -117.30939395196258 33.94356351407699, -117.64141128097314 
33.94356351407699))")
-      assert(df.first().getInt(2) == 517)
-      assert(df.first().getInt(3) == 512)
-      assert(df.first().getInt(5) == 1)
-      val blackBand = df.first().getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("Should Pass geotiff loading with all read options") {
-      var df = sparkSession.read.format("geotiff")
-        .option("dropInvalid", true)
-        .option("readFromCRS", "EPSG:4499")
-        .option("readToCRS", "EPSG:4326")
-        .option("disableErrorInCRS", true)
-        .load(rasterdatalocation)
-      df = df.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      assert(df.first().getAs[Geometry](1).toText == "POLYGON 
((-117.64141128097314 33.94356351407699, -117.64141128097314 
33.664978146501284, -117.30939395196258 33.664978146501284," +
-        " -117.30939395196258 33.94356351407699, -117.64141128097314 
33.94356351407699))")
-      assert(df.first().getInt(2) == 517)
-      assert(df.first().getInt(3) == 512)
-      assert(df.first().getInt(5) == 1)
-      val blackBand = df.first().getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("should pass RS_GetBand") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(resourceFolder + "raster/")
-      df = df.selectExpr(" image.data as data", "image.nBands as bands")
-      df = df.selectExpr("RS_GetBand(data, 1, bands) as targetBand")
-      assert(df.first().getAs[mutable.WrappedArray[Double]](0).length == 512 * 
517)
-    }
-
-    it("should pass RS_Base64") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(resourceFolder + "raster/")
-      df = df.selectExpr("image.origin as origin", 
"ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      df = df.selectExpr("RS_GetBand(data, 1, bands) as targetBand", "width", 
"height")
-      df.createOrReplaceTempView("geotiff")
-      df = sparkSession.sql("Select RS_base64(height, width, targetBand, 
RS_Array(height*width, 0.0), RS_Array(height*width, 0.0)) as encodedstring from 
geotiff")
-      assert(df.first().getAs[String](0).startsWith("iVBORw"))
-    }
-
-    it("should pass RS_HTML") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(resourceFolder + "raster/")
-      df = df.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      df = df.selectExpr("RS_GetBand(data, 1, bands) as targetBand", 
"width","height")
-      df.createOrReplaceTempView("geotiff")
-      df = sparkSession.sql("Select RS_base64(height, width, targetBand, 
RS_Array(height*width, 0.0), RS_Array(height*width, 0.0)) as encodedstring from 
geotiff")
-      df = df.selectExpr("RS_HTML(encodedstring, '300') as htmlstring" )
-      assert(df.first().getAs[String](0).startsWith("<img 
src=\"data:image/png;base64,iVBORw"))
-      assert(df.first().getAs[String](0).endsWith("/>"))    }
-
-    it("should pass RS_GetBand for length of Band 2") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(resourceFolder + "raster/test3.tif")
-      df = df.selectExpr(" image.data as data", "image.nBands as bands")
-      df = df.selectExpr("RS_GetBand(data, 2, bands) as targetBand")
-      assert(df.first().getAs[mutable.WrappedArray[Double]](0).length == 32 * 
32)
-    }
-
-    it("should pass RS_GetBand for elements of Band 2") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(resourceFolder + "raster/test3.tif")
-      df = df.selectExpr(" image.data as data", "image.nBands as bands")
-      df = df.selectExpr("RS_GetBand(data, 2, bands) as targetBand")
-      assert(df.first().getAs[mutable.WrappedArray[Double]](0)(1) == 956.0)
-    }
-
-    it("should pass RS_GetBand for elements of Band 4") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(resourceFolder + "raster/test3.tif")
-      df = df.selectExpr(" image.data as data", "image.nBands as bands")
-      df = df.selectExpr("RS_GetBand(data, 4, bands) as targetBand")
-      assert(df.first().getAs[mutable.WrappedArray[Double]](0)(2) == 0.0)
-    }
-
-    it("Should Pass geotiff file writing with coalesce") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).option("readToCRS", "EPSG:4326").load(rasterdatalocation)
-      df = df.selectExpr("image.origin as origin","image.geometry as 
geometry", "image.height as height", "image.width as width", "image.data as 
data", "image.nBands as nBands")
-      val savePath = tempDir + "/raster-written/"
-      df.coalesce(1).write.mode("overwrite").format("geotiff").save(savePath)
-
-      var loadPath = savePath
-      val tempFile = new File(loadPath)
-      val fileList = tempFile.listFiles()
-      for (i <- 0 until fileList.length) {
-        if (fileList(i).isDirectory) loadPath = fileList(i).getAbsolutePath
-      }
-
-      var dfWritten = 
sparkSession.read.format("geotiff").option("dropInvalid", true).load(loadPath)
-      dfWritten = dfWritten.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      val rowFirst = dfWritten.first()
-      assert(rowFirst.getInt(2) == 517)
-      assert(rowFirst.getInt(3) == 512)
-      assert(rowFirst.getInt(5) == 1)
-
-      val blackBand = rowFirst.getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("Should Pass geotiff file writing with writeToCRS") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image.origin as origin","image.geometry as 
geometry", "image.height as height", "image.width as width", "image.data as 
data", "image.nBands as nBands")
-      val savePath = tempDir + "/raster-written/"
-      
df.coalesce(1).write.mode("overwrite").format("geotiff").option("writeToCRS", 
"EPSG:4499").save(savePath)
-
-      var loadPath = savePath
-      val tempFile = new File(loadPath)
-      val fileList = tempFile.listFiles()
-      for (i <- 0 until fileList.length) {
-        if (fileList(i).isDirectory) loadPath = fileList(i).getAbsolutePath
-      }
-
-      var dfWritten = 
sparkSession.read.format("geotiff").option("dropInvalid", true).load(loadPath)
-      dfWritten = dfWritten.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      val rowFirst = dfWritten.first()
-      assert(rowFirst.getInt(2) == 517)
-      assert(rowFirst.getInt(3) == 512)
-      assert(rowFirst.getInt(5) == 1)
-
-      val blackBand = rowFirst.getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("Should Pass geotiff file writing without coalesce") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image.origin as origin","image.geometry as 
geometry", "image.height as height", "image.width as width", "image.data as 
data", "image.nBands as nBands")
-      val savePath = tempDir + "/raster-written/"
-      df.write.mode("overwrite").format("geotiff").save(savePath)
-
-      var imageCount = 0
-      def getFile(loadPath: String): Unit ={
-        val tempFile = new File(loadPath)
-        val fileList = tempFile.listFiles()
-        if (fileList == null) return
-        for (i <- 0 until fileList.length) {
-          if (fileList(i).isDirectory) getFile(fileList(i).getAbsolutePath)
-          else if (fileList(i).getAbsolutePath.endsWith(".tiff") || 
fileList(i).getAbsolutePath.endsWith(".tif")) imageCount += 1
-        }
-      }
-
-      getFile(savePath)
-      assert(imageCount == 3)
-    }
-
-    it("Should Pass geotiff file writing with nested schema") {
-      val df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      val savePath = resourceFolder + "raster-written/"
-      df.write.mode("overwrite").format("geotiff").save(savePath)
-
-      var imageCount = 0
-      def getFile(loadPath: String): Unit ={
-        val tempFile = new File(loadPath)
-        val fileList = tempFile.listFiles()
-        if (fileList == null) return
-        for (i <- 0 until fileList.length) {
-          if (fileList(i).isDirectory) getFile(fileList(i).getAbsolutePath)
-          else if (fileList(i).getAbsolutePath.endsWith(".tiff") || 
fileList(i).getAbsolutePath.endsWith(".tif")) imageCount += 1
-        }
-      }
-
-      getFile(savePath)
-      assert(imageCount == 3)
-    }
-
-    it("Should Pass geotiff file writing with renamed fields") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image.origin as source","image.geometry as geom", 
"image.height as height", "image.width as width", "image.data as data", 
"image.nBands as bands")
-      val savePath = resourceFolder + "raster-written/"
-      df.write
-        .mode("overwrite")
-        .format("geotiff")
-        .option("fieldOrigin", "source")
-        .option("fieldGeometry", "geom")
-        .option("fieldNBands", "bands")
-        .save(savePath)
-
-      var imageCount = 0
-      def getFile(loadPath: String): Unit ={
-        val tempFile = new File(loadPath)
-        val fileList = tempFile.listFiles()
-        if (fileList == null) return
-        for (i <- 0 until fileList.length) {
-          if (fileList(i).isDirectory) getFile(fileList(i).getAbsolutePath)
-          else if (fileList(i).getAbsolutePath.endsWith(".tiff") || 
fileList(i).getAbsolutePath.endsWith(".tif")) imageCount += 1
-        }
-      }
-
-      getFile(savePath)
-      assert(imageCount == 3)
-    }
-
-    it("Should Pass geotiff file writing with nested schema and renamed 
fields") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image as tiff_image")
-      val savePath = resourceFolder + "raster-written/"
-      df.write
-        .mode("overwrite")
-        .format("geotiff")
-        .option("fieldImage", "tiff_image")
-        .save(savePath)
-
-      var imageCount = 0
-      def getFile(loadPath: String): Unit ={
-        val tempFile = new File(loadPath)
-        val fileList = tempFile.listFiles()
-        if (fileList == null) return
-        for (i <- 0 until fileList.length) {
-          if (fileList(i).isDirectory) getFile(fileList(i).getAbsolutePath)
-          else if (fileList(i).getAbsolutePath.endsWith(".tiff") || 
fileList(i).getAbsolutePath.endsWith(".tif")) imageCount += 1
-        }
-      }
-
-      getFile(savePath)
-      assert(imageCount == 3)
-    }
-
-    it("Should Pass geotiff file writing with converted geometry") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image.origin as 
source","ST_GeomFromWkt(image.geometry) as geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      val savePath = resourceFolder + "raster-written/"
-      df.write
-        .mode("overwrite")
-        .format("geotiff")
-        .option("fieldOrigin", "source")
-        .option("fieldGeometry", "geom")
-        .option("fieldNBands", "bands")
-        .save(savePath)
-
-      var imageCount = 0
-      def getFile(loadPath: String): Unit ={
-        val tempFile = new File(loadPath)
-        val fileList = tempFile.listFiles()
-        if (fileList == null) return
-        for (i <- 0 until fileList.length) {
-          if (fileList(i).isDirectory) getFile(fileList(i).getAbsolutePath)
-          else if (fileList(i).getAbsolutePath.endsWith(".tiff") || 
fileList(i).getAbsolutePath.endsWith(".tif")) imageCount += 1
-        }
-      }
-
-      getFile(savePath)
-      assert(imageCount == 3)
-    }
-
-    it("Should Pass geotiff file writing with handling invalid schema") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image.origin as origin","image.geometry as 
geometry", "image.height as height", "image.width as width", "image.data as 
data")
-      val savePath = resourceFolder + "raster-written/"
-
-      try {
-        df.write
-          .mode("overwrite")
-          .format("geotiff")
-          .option("fieldImage", "tiff_image")
-          .save(savePath)
-      }
-      catch {
-        case e: IllegalArgumentException => {
-          assert(e.getMessage == "Invalid GeoTiff Schema")
-        }
-      }
-    }
-
-    it("should read geotiff using binary source and write geotiff back to disk 
using raster source") {
-      var rasterDf = 
sparkSession.read.format("binaryFile").load(rasterdatalocation)
-      val rasterCount = rasterDf.count()
-      rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(tempDir + 
"/raster-written")
-      rasterDf = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
-      rasterDf = rasterDf.selectExpr("RS_FromGeoTiff(content)")
-      assert(rasterDf.count() == rasterCount)
-    }
-
-    it("should read and write geotiff using given options") {
-      var rasterDf = 
sparkSession.read.format("binaryFile").load(rasterdatalocation)
-      val rasterCount = rasterDf.count()
-      rasterDf.write.format("raster").option("rasterField", 
"content").option("fileExtension", ".tiff").option("pathField", 
"path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
-      rasterDf = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
-      rasterDf = rasterDf.selectExpr("RS_FromGeoTiff(content)")
-      assert(rasterDf.count() == rasterCount)
-    }
-
-    it("should read and write via RS_FromGeoTiff and RS_AsGeoTiff") {
-      var df = sparkSession.read.format("binaryFile").load(rasterdatalocation)
-      var rasterDf = df.selectExpr("RS_FromGeoTiff(content) as raster", 
"path").selectExpr("RS_AsGeoTiff(raster) as content", "path")
-      val rasterCount = rasterDf.count()
-      rasterDf.write.format("raster").option("rasterField", 
"content").option("fileExtension", ".tiff").option("pathField", 
"path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
-      df = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
-      rasterDf = df.selectExpr("RS_FromGeoTiff(content)")
-      assert(rasterDf.count() == rasterCount)
-    }
-
-    it("should handle null") {
-      var df = sparkSession.read.format("binaryFile").load(rasterdatalocation)
-      var rasterDf = df.selectExpr("RS_FromGeoTiff(null) as raster", 
"length").selectExpr("RS_AsGeoTiff(raster) as content", "length")
-      val rasterCount = rasterDf.count()
-      rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(tempDir + 
"/raster-written")
-      df = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
-      rasterDf = df.selectExpr("RS_FromGeoTiff(content)")
-      assert(rasterCount == 3)
-      assert(rasterDf.count() == 0)
-    }
-
-    it("should read RS_FromGeoTiff and write RS_AsArcGrid") {
-      var df = sparkSession.read.format("binaryFile").load(resourceFolder + 
"raster_geotiff_color/*")
-      var rasterDf = df.selectExpr("RS_FromGeoTiff(content) as raster", 
"path").selectExpr("RS_AsArcGrid(raster, 1) as content", "path")
-      val rasterCount = rasterDf.count()
-      rasterDf.write.format("raster").option("rasterField", 
"content").option("fileExtension", ".asc").option("pathField", 
"path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
-      df = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
-      rasterDf = df.selectExpr("RS_FromArcInfoAsciiGrid(content)")
-      assert(rasterDf.count() == rasterCount)
-    }
-  }
-
-  override def afterAll(): Unit = FileUtils.deleteDirectory(new File(tempDir))
-}
\ No newline at end of file
diff --git 
a/sql/spark-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/spark-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 4352e8182..8ca039262 100644
--- 
a/sql/spark-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/spark-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,3 +1 @@
-org.apache.spark.sql.sedona_sql.io.raster.GeotiffFileFormat
-org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat
-org.apache.spark.sql.sedona_sql.io.raster.RasterFileFormat
\ No newline at end of file
+org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat
\ No newline at end of file
diff --git 
a/sql/spark-3.4/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffFileFormat.scala
 
b/sql/spark-3.4/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffFileFormat.scala
deleted file mode 100644
index 318da263b..000000000
--- 
a/sql/spark-3.4/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffFileFormat.scala
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-package org.apache.spark.sql.sedona_sql.io.raster
-
-import com.google.common.io.{ByteStreams, Closeables}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
-import org.apache.sedona.sql.utils.GeometrySerializer
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.catalyst.util.ArrayData
-import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, 
OutputWriterFactory, PartitionedFile}
-import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.SerializableConfiguration
-import org.geotools.coverage.CoverageFactoryFinder
-import org.geotools.coverage.grid.io.{AbstractGridFormat, GridFormatFinder}
-import org.geotools.gce.geotiff.{GeoTiffFormat, GeoTiffWriteParams, 
GeoTiffWriter}
-import org.geotools.geometry.jts.ReferencedEnvelope
-import org.geotools.referencing.CRS
-import org.geotools.util.factory.Hints
-import org.locationtech.jts.geom.{Coordinate, Polygon}
-import org.locationtech.jts.io.WKTReader
-import org.opengis.parameter.GeneralParameterValue
-
-import java.awt.image.DataBuffer
-import java.io.IOException
-import java.nio.file.Paths
-import javax.imageio.ImageWriteParam
-import javax.media.jai.RasterFactory
-
-private[spark] class GeotiffFileFormat extends FileFormat with 
DataSourceRegister {
-
-  override def inferSchema(
-                            sparkSession: SparkSession,
-                            options: Map[String, String],
-                            files: Seq[FileStatus]): Option[StructType] = 
Some(GeotiffSchema.imageSchema)
-
-  override def prepareWrite(
-                             sparkSession: SparkSession,
-                             job: Job,
-                             options: Map[String, String],
-                             dataSchema: StructType): OutputWriterFactory = {
-    val imageWriteOptions = new ImageWriteOptions(options)
-    if (!isValidGeoTiffSchema(imageWriteOptions, dataSchema)) {
-      throw new IllegalArgumentException("Invalid GeoTiff Schema")
-    }
-
-    new OutputWriterFactory {
-      override def getFileExtension(context: TaskAttemptContext): String = ""
-
-      override def newInstance(path: String, dataSchema: StructType, context: 
TaskAttemptContext): OutputWriter = {
-        new GeotiffFileWriter(path, imageWriteOptions, dataSchema, context)
-      }
-    }
-  }
-
-  override def shortName(): String = "geotiff"
-
-  override protected def buildReader(
-                                      sparkSession: SparkSession,
-                                      dataSchema: StructType,
-                                      partitionSchema: StructType,
-                                      requiredSchema: StructType,
-                                      filters: Seq[Filter],
-                                      options: Map[String, String],
-                                      hadoopConf: Configuration): 
(PartitionedFile) => Iterator[InternalRow] = {
-    assert(
-      requiredSchema.length <= 1,
-      "Image data source only produces a single data column named \"image\".")
-
-    val broadcastedHadoopConf =
-      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
-
-    val imageSourceOptions = new ImageReadOptions(options)
-
-    (file: PartitionedFile) => {
-      val emptyUnsafeRow = new UnsafeRow(0)
-      if (!imageSourceOptions.dropInvalid && requiredSchema.isEmpty) {
-        Iterator(emptyUnsafeRow)
-      } else {
-        val path = file.toPath
-        val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
-        val stream = fs.open(path)
-        val bytes = try {
-          ByteStreams.toByteArray(stream)
-        } finally {
-          Closeables.close(stream, true)
-        }
-
-        val resultOpt = GeotiffSchema.decode(path.toString, bytes, 
imageSourceOptions)
-        val filteredResult = if (imageSourceOptions.dropInvalid) {
-          resultOpt.toIterator
-        } else {
-          
Iterator(resultOpt.getOrElse(GeotiffSchema.invalidImageRow(path.toString)))
-        }
-
-        if (requiredSchema.isEmpty) {
-          filteredResult.map(_ => emptyUnsafeRow)
-        } else {
-          val converter = RowEncoder(requiredSchema).createSerializer() // 
SPARK3 anchor
-           filteredResult.map(row => converter(row)) // SPARK3 anchor
-//          val converter = RowEncoder(requiredSchema) // SPARK2 anchor
-//          filteredResult.map(row => converter.toRow(row)) // SPARK2 anchor
-        }
-      }
-    }
-  }
-
-  private def isValidGeoTiffSchema(imageWriteOptions: ImageWriteOptions, 
dataSchema: StructType): Boolean = {
-    val fields = dataSchema.fieldNames
-    if (fields.contains(imageWriteOptions.colImage) ){
-      val schemaFields = 
dataSchema.fields(dataSchema.fieldIndex(imageWriteOptions.colImage)).dataType.asInstanceOf[StructType]
-      if (schemaFields.fieldNames.length != 6) return false
-    }
-    else {
-      if (fields.length != 6) return false
-    }
-    true
-  }
-
-}
-
-// class for writing geoTiff images
-private class GeotiffFileWriter(savePath: String,
-                                imageWriteOptions: ImageWriteOptions,
-                                dataSchema: StructType,
-                                context: TaskAttemptContext) extends 
OutputWriter {
-
-  // set writing parameters
-  private val DEFAULT_WRITE_PARAMS: GeoTiffWriteParams = new 
GeoTiffWriteParams()
-  DEFAULT_WRITE_PARAMS.setCompressionMode(ImageWriteParam.MODE_EXPLICIT)
-  DEFAULT_WRITE_PARAMS.setCompressionType("LZW")
-  DEFAULT_WRITE_PARAMS.setCompressionQuality(0.75F)
-  DEFAULT_WRITE_PARAMS.setTilingMode(ImageWriteParam.MODE_EXPLICIT)
-  DEFAULT_WRITE_PARAMS.setTiling(512, 512)
-
-  private val hfs = new Path(savePath).getFileSystem(context.getConfiguration)
-
-  override def write(row: InternalRow): Unit = {
-    // retrieving the metadata of a geotiff image
-    var rowFields: InternalRow = row
-    var schemaFields: StructType = dataSchema
-    val fields = dataSchema.fieldNames
-
-    if (fields.contains(imageWriteOptions.colImage)) {
-      schemaFields = 
dataSchema.fields(dataSchema.fieldIndex(imageWriteOptions.colImage)).dataType.asInstanceOf[StructType]
-      rowFields = 
row.getStruct(dataSchema.fieldIndex(imageWriteOptions.colImage), 6)
-    }
-
-    val tiffOrigin = 
rowFields.getString(schemaFields.fieldIndex(imageWriteOptions.colOrigin))
-    val tiffBands = 
rowFields.getInt(schemaFields.fieldIndex(imageWriteOptions.colBands))
-    val tiffWidth = 
rowFields.getInt(schemaFields.fieldIndex(imageWriteOptions.colWidth))
-    val tiffHeight = 
rowFields.getInt(schemaFields.fieldIndex(imageWriteOptions.colHeight))
-    val tiffGeometry = 
Row.fromSeq(rowFields.toSeq(schemaFields)).get(schemaFields.fieldIndex(imageWriteOptions.colGeometry))
-    val tiffData = 
rowFields.getArray(schemaFields.fieldIndex(imageWriteOptions.colData)).toDoubleArray()
-
-    // if an image is invalid, fields are -1 and data array is empty. Skip 
writing that image
-    if (tiffBands == -1) return
-
-    // create a writable raster object
-    val raster = RasterFactory.createBandedRaster(DataBuffer.TYPE_DOUBLE, 
tiffWidth, tiffHeight, tiffBands, null)
-
-    // extract the pixels of the geotiff image and write to the writable raster
-    val pixelVal = Array.ofDim[Double](tiffBands)
-    for (i <- 0 until tiffHeight) {
-      for (j <- 0 until tiffWidth) {
-        for (k <- 0 until tiffBands) {
-          pixelVal(k) = tiffData(tiffHeight*tiffWidth*k + i * tiffWidth + j)
-        }
-        raster.setPixel(j, i, pixelVal)
-      }
-    }
-
-    // CRS is decoded to user-provided option "writeToCRS", default value is 
"EPSG:4326"
-    val crs = CRS.decode(imageWriteOptions.writeToCRS, true)
-
-    // Extract the geometry coordinates and set the envelop of the geotiff 
source
-    var coordinateList: Array[Coordinate] = null
-    if (tiffGeometry.isInstanceOf[UTF8String]) {
-      val wktReader = new WKTReader()
-      val envGeom = wktReader.read(tiffGeometry.toString).asInstanceOf[Polygon]
-      coordinateList = envGeom.getCoordinates()
-    } else {
-      val envGeom = 
GeometrySerializer.deserialize(tiffGeometry.asInstanceOf[Array[Byte]])
-      coordinateList = envGeom.getCoordinates()
-    }
-    val referencedEnvelope = new ReferencedEnvelope(coordinateList(0).x, 
coordinateList(2).x, coordinateList(0).y, coordinateList(2).y, crs)
-
-    // create the write path
-    val writePath = Paths.get(savePath, new Path(tiffOrigin).getName).toString
-    val out = hfs.create(new Path(writePath))
-
-    val format = GridFormatFinder.findFormat(out)
-    var hints: Hints = null
-    if (format.isInstanceOf[GeoTiffFormat]) {
-      hints = new Hints(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER, true)
-    }
-
-    // create the writer object
-    val factory = CoverageFactoryFinder.getGridCoverageFactory(hints)
-    val gc = factory.create("GRID", raster, referencedEnvelope)
-    val writer = new GeoTiffWriter(out, hints)
-
-    val gtiffParams = new GeoTiffFormat().getWriteParameters
-    
gtiffParams.parameter(AbstractGridFormat.GEOTOOLS_WRITE_PARAMS.getName.toString).setValue(DEFAULT_WRITE_PARAMS)
-    val wps: Array[GeneralParameterValue] = gtiffParams.values.toArray(new 
Array[GeneralParameterValue](1))
-
-    // write the geotiff image to file
-    try {
-      writer.write(gc, wps)
-      writer.dispose()
-      out.close()
-    } catch {
-      case e@(_: IllegalArgumentException | _: IOException) =>
-        // TODO Auto-generated catch block
-        e.printStackTrace()
-    }
-  }
-
-  override def close(): Unit = {
-    hfs.close()
-  }
-
-  def path(): String = {
-    savePath
-  }
-}
diff --git 
a/sql/spark-3.4/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala 
b/sql/spark-3.4/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
deleted file mode 100644
index 8a6b56ec0..000000000
--- a/sql/spark-3.4/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sedona.sql
-
-import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.SaveMode
-import org.locationtech.jts.geom.Geometry
-import org.scalatest.{BeforeAndAfter, GivenWhenThen}
-
-import java.io.File
-import java.nio.file.Files
-import scala.collection.mutable
-
-class rasterIOTest extends TestBaseScala with BeforeAndAfter with 
GivenWhenThen {
-
-  var rasterdatalocation: String = resourceFolder + "raster/"
-  val tempDir: String = 
Files.createTempDirectory("sedona_raster_io_test_").toFile.getAbsolutePath
-
-  describe("Raster IO test") {
-    it("Should Pass geotiff loading without readFromCRS and readToCRS") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      assert(df.first().getAs[Geometry](1).toText == "POLYGON ((-13095782 
4021226.5, -13095782 3983905, -13058822 3983905, -13058822 4021226.5, -13095782 
4021226.5))")
-      assert(df.first().getInt(2) == 517)
-      assert(df.first().getInt(3) == 512)
-      assert(df.first().getInt(5) == 1)
-      val blackBand = df.first().getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("Should Pass geotiff loading with readToCRS") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).option("readToCRS", "EPSG:4326").load(rasterdatalocation)
-      df = df.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      assert(df.first().getAs[Geometry](1).toText == "POLYGON 
((-117.64141128097314 33.94356351407699, -117.64141128097314 
33.664978146501284, -117.30939395196258 33.664978146501284," +
-        " -117.30939395196258 33.94356351407699, -117.64141128097314 
33.94356351407699))")
-      assert(df.first().getInt(2) == 517)
-      assert(df.first().getInt(3) == 512)
-      assert(df.first().getInt(5) == 1)
-      val blackBand = df.first().getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("Should Pass geotiff loading with readFromCRS") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).option("readFromCRS", "EPSG:4499").load(rasterdatalocation)
-      df = df.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      assert(df.first().getAs[Geometry](1).toText == "POLYGON ((-13095782 
4021226.5, -13095782 3983905, -13058822 3983905, -13058822 4021226.5, -13095782 
4021226.5))")
-      assert(df.first().getInt(2) == 517)
-      assert(df.first().getInt(3) == 512)
-      assert(df.first().getInt(5) == 1)
-      val blackBand = df.first().getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("Should Pass geotiff loading with readFromCRS and readToCRS") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).option("readFromCRS", "EPSG:4499").option("readToCRS", 
"EPSG:4326").load(rasterdatalocation)
-      df = df.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      assert(df.first().getAs[Geometry](1).toText == "POLYGON 
((-117.64141128097314 33.94356351407699, -117.64141128097314 
33.664978146501284, -117.30939395196258 33.664978146501284," +
-        " -117.30939395196258 33.94356351407699, -117.64141128097314 
33.94356351407699))")
-      assert(df.first().getInt(2) == 517)
-      assert(df.first().getInt(3) == 512)
-      assert(df.first().getInt(5) == 1)
-      val blackBand = df.first().getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("Should Pass geotiff loading with all read options") {
-      var df = sparkSession.read.format("geotiff")
-        .option("dropInvalid", true)
-        .option("readFromCRS", "EPSG:4499")
-        .option("readToCRS", "EPSG:4326")
-        .option("disableErrorInCRS", true)
-        .load(rasterdatalocation)
-      df = df.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      assert(df.first().getAs[Geometry](1).toText == "POLYGON 
((-117.64141128097314 33.94356351407699, -117.64141128097314 
33.664978146501284, -117.30939395196258 33.664978146501284," +
-        " -117.30939395196258 33.94356351407699, -117.64141128097314 
33.94356351407699))")
-      assert(df.first().getInt(2) == 517)
-      assert(df.first().getInt(3) == 512)
-      assert(df.first().getInt(5) == 1)
-      val blackBand = df.first().getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("should pass RS_GetBand") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(resourceFolder + "raster/")
-      df = df.selectExpr(" image.data as data", "image.nBands as bands")
-      df = df.selectExpr("RS_GetBand(data, 1, bands) as targetBand")
-      assert(df.first().getAs[mutable.WrappedArray[Double]](0).length == 512 * 
517)
-    }
-
-    it("should pass RS_Base64") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(resourceFolder + "raster/")
-      df = df.selectExpr("image.origin as origin", 
"ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      df = df.selectExpr("RS_GetBand(data, 1, bands) as targetBand", "width", 
"height")
-      df.createOrReplaceTempView("geotiff")
-      df = sparkSession.sql("Select RS_base64(height, width, targetBand, 
RS_Array(height*width, 0.0), RS_Array(height*width, 0.0)) as encodedstring from 
geotiff")
-      assert(df.first().getAs[String](0).startsWith("iVBORw"))
-    }
-
-    it("should pass RS_HTML") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(resourceFolder + "raster/")
-      df = df.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      df = df.selectExpr("RS_GetBand(data, 1, bands) as targetBand", 
"width","height")
-      df.createOrReplaceTempView("geotiff")
-      df = sparkSession.sql("Select RS_base64(height, width, targetBand, 
RS_Array(height*width, 0.0), RS_Array(height*width, 0.0)) as encodedstring from 
geotiff")
-      df = df.selectExpr("RS_HTML(encodedstring, '300') as htmlstring" )
-      assert(df.first().getAs[String](0).startsWith("<img 
src=\"data:image/png;base64,iVBORw"))
-      assert(df.first().getAs[String](0).endsWith("/>"))    }
-
-    it("should pass RS_GetBand for length of Band 2") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(resourceFolder + "raster/test3.tif")
-      df = df.selectExpr(" image.data as data", "image.nBands as bands")
-      df = df.selectExpr("RS_GetBand(data, 2, bands) as targetBand")
-      assert(df.first().getAs[mutable.WrappedArray[Double]](0).length == 32 * 
32)
-    }
-
-    it("should pass RS_GetBand for elements of Band 2") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(resourceFolder + "raster/test3.tif")
-      df = df.selectExpr(" image.data as data", "image.nBands as bands")
-      df = df.selectExpr("RS_GetBand(data, 2, bands) as targetBand")
-      assert(df.first().getAs[mutable.WrappedArray[Double]](0)(1) == 956.0)
-    }
-
-    it("should pass RS_GetBand for elements of Band 4") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(resourceFolder + "raster/test3.tif")
-      df = df.selectExpr(" image.data as data", "image.nBands as bands")
-      df = df.selectExpr("RS_GetBand(data, 4, bands) as targetBand")
-      assert(df.first().getAs[mutable.WrappedArray[Double]](0)(2) == 0.0)
-    }
-
-    it("Should Pass geotiff file writing with coalesce") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).option("readToCRS", "EPSG:4326").load(rasterdatalocation)
-      df = df.selectExpr("image.origin as origin","image.geometry as 
geometry", "image.height as height", "image.width as width", "image.data as 
data", "image.nBands as nBands")
-      val savePath = tempDir + "/raster-written/"
-      df.coalesce(1).write.mode("overwrite").format("geotiff").save(savePath)
-
-      var loadPath = savePath
-      val tempFile = new File(loadPath)
-      val fileList = tempFile.listFiles()
-      for (i <- 0 until fileList.length) {
-        if (fileList(i).isDirectory) loadPath = fileList(i).getAbsolutePath
-      }
-
-      var dfWritten = 
sparkSession.read.format("geotiff").option("dropInvalid", true).load(loadPath)
-      dfWritten = dfWritten.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      val rowFirst = dfWritten.first()
-      assert(rowFirst.getInt(2) == 517)
-      assert(rowFirst.getInt(3) == 512)
-      assert(rowFirst.getInt(5) == 1)
-
-      val blackBand = rowFirst.getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("Should Pass geotiff file writing with writeToCRS") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image.origin as origin","image.geometry as 
geometry", "image.height as height", "image.width as width", "image.data as 
data", "image.nBands as nBands")
-      val savePath = tempDir + "/raster-written/"
-      
df.coalesce(1).write.mode("overwrite").format("geotiff").option("writeToCRS", 
"EPSG:4499").save(savePath)
-
-      var loadPath = savePath
-      val tempFile = new File(loadPath)
-      val fileList = tempFile.listFiles()
-      for (i <- 0 until fileList.length) {
-        if (fileList(i).isDirectory) loadPath = fileList(i).getAbsolutePath
-      }
-
-      var dfWritten = 
sparkSession.read.format("geotiff").option("dropInvalid", true).load(loadPath)
-      dfWritten = dfWritten.selectExpr("image.origin as 
origin","ST_GeomFromWkt(image.geometry) as Geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      val rowFirst = dfWritten.first()
-      assert(rowFirst.getInt(2) == 517)
-      assert(rowFirst.getInt(3) == 512)
-      assert(rowFirst.getInt(5) == 1)
-
-      val blackBand = rowFirst.getAs[mutable.WrappedArray[Double]](4)
-      val line1 = blackBand.slice(0, 512)
-      val line2 = blackBand.slice(512, 1024)
-      assert(line1(0) == 0.0) // The first value at line 1 is black
-      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, 
value at 159 is black and at 160 is not black
-    }
-
-    it("Should Pass geotiff file writing without coalesce") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image.origin as origin","image.geometry as 
geometry", "image.height as height", "image.width as width", "image.data as 
data", "image.nBands as nBands")
-      val savePath = tempDir + "/raster-written/"
-      df.write.mode("overwrite").format("geotiff").save(savePath)
-
-      var imageCount = 0
-      def getFile(loadPath: String): Unit ={
-        val tempFile = new File(loadPath)
-        val fileList = tempFile.listFiles()
-        if (fileList == null) return
-        for (i <- 0 until fileList.length) {
-          if (fileList(i).isDirectory) getFile(fileList(i).getAbsolutePath)
-          else if (fileList(i).getAbsolutePath.endsWith(".tiff") || 
fileList(i).getAbsolutePath.endsWith(".tif")) imageCount += 1
-        }
-      }
-
-      getFile(savePath)
-      assert(imageCount == 3)
-    }
-
-    it("Should Pass geotiff file writing with nested schema") {
-      val df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      val savePath = resourceFolder + "raster-written/"
-      df.write.mode("overwrite").format("geotiff").save(savePath)
-
-      var imageCount = 0
-      def getFile(loadPath: String): Unit ={
-        val tempFile = new File(loadPath)
-        val fileList = tempFile.listFiles()
-        if (fileList == null) return
-        for (i <- 0 until fileList.length) {
-          if (fileList(i).isDirectory) getFile(fileList(i).getAbsolutePath)
-          else if (fileList(i).getAbsolutePath.endsWith(".tiff") || 
fileList(i).getAbsolutePath.endsWith(".tif")) imageCount += 1
-        }
-      }
-
-      getFile(savePath)
-      assert(imageCount == 3)
-    }
-
-    it("Should Pass geotiff file writing with renamed fields") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image.origin as source","image.geometry as geom", 
"image.height as height", "image.width as width", "image.data as data", 
"image.nBands as bands")
-      val savePath = resourceFolder + "raster-written/"
-      df.write
-        .mode("overwrite")
-        .format("geotiff")
-        .option("fieldOrigin", "source")
-        .option("fieldGeometry", "geom")
-        .option("fieldNBands", "bands")
-        .save(savePath)
-
-      var imageCount = 0
-      def getFile(loadPath: String): Unit ={
-        val tempFile = new File(loadPath)
-        val fileList = tempFile.listFiles()
-        if (fileList == null) return
-        for (i <- 0 until fileList.length) {
-          if (fileList(i).isDirectory) getFile(fileList(i).getAbsolutePath)
-          else if (fileList(i).getAbsolutePath.endsWith(".tiff") || 
fileList(i).getAbsolutePath.endsWith(".tif")) imageCount += 1
-        }
-      }
-
-      getFile(savePath)
-      assert(imageCount == 3)
-    }
-
-    it("Should Pass geotiff file writing with nested schema and renamed 
fields") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image as tiff_image")
-      val savePath = resourceFolder + "raster-written/"
-      df.write
-        .mode("overwrite")
-        .format("geotiff")
-        .option("fieldImage", "tiff_image")
-        .save(savePath)
-
-      var imageCount = 0
-      def getFile(loadPath: String): Unit ={
-        val tempFile = new File(loadPath)
-        val fileList = tempFile.listFiles()
-        if (fileList == null) return
-        for (i <- 0 until fileList.length) {
-          if (fileList(i).isDirectory) getFile(fileList(i).getAbsolutePath)
-          else if (fileList(i).getAbsolutePath.endsWith(".tiff") || 
fileList(i).getAbsolutePath.endsWith(".tif")) imageCount += 1
-        }
-      }
-
-      getFile(savePath)
-      assert(imageCount == 3)
-    }
-
-    it("Should Pass geotiff file writing with converted geometry") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image.origin as 
source","ST_GeomFromWkt(image.geometry) as geom", "image.height as height", 
"image.width as width", "image.data as data", "image.nBands as bands")
-      val savePath = resourceFolder + "raster-written/"
-      df.write
-        .mode("overwrite")
-        .format("geotiff")
-        .option("fieldOrigin", "source")
-        .option("fieldGeometry", "geom")
-        .option("fieldNBands", "bands")
-        .save(savePath)
-
-      var imageCount = 0
-      def getFile(loadPath: String): Unit ={
-        val tempFile = new File(loadPath)
-        val fileList = tempFile.listFiles()
-        if (fileList == null) return
-        for (i <- 0 until fileList.length) {
-          if (fileList(i).isDirectory) getFile(fileList(i).getAbsolutePath)
-          else if (fileList(i).getAbsolutePath.endsWith(".tiff") || 
fileList(i).getAbsolutePath.endsWith(".tif")) imageCount += 1
-        }
-      }
-
-      getFile(savePath)
-      assert(imageCount == 3)
-    }
-
-    it("Should Pass geotiff file writing with handling invalid schema") {
-      var df = sparkSession.read.format("geotiff").option("dropInvalid", 
true).load(rasterdatalocation)
-      df = df.selectExpr("image.origin as origin","image.geometry as 
geometry", "image.height as height", "image.width as width", "image.data as 
data")
-      val savePath = resourceFolder + "raster-written/"
-
-      try {
-        df.write
-          .mode("overwrite")
-          .format("geotiff")
-          .option("fieldImage", "tiff_image")
-          .save(savePath)
-      }
-      catch {
-        case e: IllegalArgumentException => {
-          assert(e.getMessage == "Invalid GeoTiff Schema")
-        }
-      }
-    }
-
-    it("should read geotiff using binary source and write geotiff back to disk 
using raster source") {
-      var rasterDf = 
sparkSession.read.format("binaryFile").load(rasterdatalocation)
-      val rasterCount = rasterDf.count()
-      rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(tempDir + 
"/raster-written")
-      rasterDf = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
-      rasterDf = rasterDf.selectExpr("RS_FromGeoTiff(content)")
-      assert(rasterDf.count() == rasterCount)
-    }
-
-    it("should read and write geotiff using given options") {
-      var rasterDf = 
sparkSession.read.format("binaryFile").load(rasterdatalocation)
-      val rasterCount = rasterDf.count()
-      rasterDf.write.format("raster").option("rasterField", 
"content").option("fileExtension", ".tiff").option("pathField", 
"path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
-      rasterDf = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
-      rasterDf = rasterDf.selectExpr("RS_FromGeoTiff(content)")
-      assert(rasterDf.count() == rasterCount)
-    }
-
-    it("should read and write via RS_FromGeoTiff and RS_AsGeoTiff") {
-      var df = sparkSession.read.format("binaryFile").load(rasterdatalocation)
-      var rasterDf = df.selectExpr("RS_FromGeoTiff(content) as raster", 
"path").selectExpr("RS_AsGeoTiff(raster) as content", "path")
-      val rasterCount = rasterDf.count()
-      rasterDf.write.format("raster").option("rasterField", 
"content").option("fileExtension", ".tiff").option("pathField", 
"path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
-      df = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
-      rasterDf = df.selectExpr("RS_FromGeoTiff(content)")
-      assert(rasterDf.count() == rasterCount)
-    }
-
-    it("should handle null") {
-      var df = sparkSession.read.format("binaryFile").load(rasterdatalocation)
-      var rasterDf = df.selectExpr("RS_FromGeoTiff(null) as raster", 
"length").selectExpr("RS_AsGeoTiff(raster) as content", "length")
-      val rasterCount = rasterDf.count()
-      rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(tempDir + 
"/raster-written")
-      df = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
-      rasterDf = df.selectExpr("RS_FromGeoTiff(content)")
-      assert(rasterCount == 3)
-      assert(rasterDf.count() == 0)
-    }
-
-    it("should read RS_FromGeoTiff and write RS_AsArcGrid") {
-      var df = sparkSession.read.format("binaryFile").load(resourceFolder + 
"raster_geotiff_color/*")
-      var rasterDf = df.selectExpr("RS_FromGeoTiff(content) as raster", 
"path").selectExpr("RS_AsArcGrid(raster, 1) as content", "path")
-      val rasterCount = rasterDf.count()
-      rasterDf.write.format("raster").option("rasterField", 
"content").option("fileExtension", ".asc").option("pathField", 
"path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
-      df = sparkSession.read.format("binaryFile").load(tempDir + 
"/raster-written/*")
-      rasterDf = df.selectExpr("RS_FromArcInfoAsciiGrid(content)")
-      assert(rasterDf.count() == rasterCount)
-    }
-  }
-
-  override def afterAll(): Unit = FileUtils.deleteDirectory(new File(tempDir))
-}
\ No newline at end of file


Reply via email to