spark git commit: [SPARK-26189][R] Fix unionAll doc in SparkR
Repository: spark Updated Branches: refs/heads/master 28d337440 -> 2f6e88fec [SPARK-26189][R] Fix unionAll doc in SparkR ## What changes were proposed in this pull request? Fix unionAll doc in SparkR ## How was this patch tested? Manually ran test Author: Huaxin Gao Closes #23161 from huaxingao/spark-26189. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f6e88fe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f6e88fe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f6e88fe Branch: refs/heads/master Commit: 2f6e88fecb455a02c4c08c41290e2f338e979543 Parents: 28d3374 Author: Huaxin Gao Authored: Fri Nov 30 23:14:05 2018 -0800 Committer: Felix Cheung Committed: Fri Nov 30 23:14:05 2018 -0800 -- R/pkg/R/DataFrame.R | 20 R/pkg/R/generics.R | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2f6e88fe/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 745bb3e..24ed449 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2730,13 +2730,25 @@ setMethod("union", dataFrame(unioned) }) -#' Return a new SparkDataFrame containing the union of rows +#' Return a new SparkDataFrame containing the union of rows. #' -#' This is an alias for `union`. +#' This is an alias for \code{union}. #' -#' @rdname union -#' @name unionAll +#' @param x a SparkDataFrame. +#' @param y a SparkDataFrame. +#' @return A SparkDataFrame containing the result of the unionAll operation. +#' @family SparkDataFrame functions #' @aliases unionAll,SparkDataFrame,SparkDataFrame-method +#' @rdname unionAll +#' @name unionAll +#' @seealso \link{union} +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df1 <- read.json(path) +#' df2 <- read.json(path2) +#' unionAllDF <- unionAll(df1, df2) +#' } #' @note unionAll since 1.4.0 setMethod("unionAll", signature(x = "SparkDataFrame", y = "SparkDataFrame"), http://git-wip-us.apache.org/repos/asf/spark/blob/2f6e88fe/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 9d8c24c..eed7646 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -631,7 +631,7 @@ setGeneric("toRDD", function(x) { standardGeneric("toRDD") }) #' @rdname union setGeneric("union", function(x, y) { standardGeneric("union") }) -#' @rdname union +#' @rdname unionAll setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") }) #' @rdname unionByName - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r31280 - in /dev/spark/3.0.0-SNAPSHOT-2018_11_30_21_44-28d3374-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Dec 1 05:56:51 2018 New Revision: 31280 Log: Apache Spark 3.0.0-SNAPSHOT-2018_11_30_21_44-28d3374 docs [This commit notification would consist of 1764 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23647][PYTHON][SQL] Adds more types for hint in pyspark
Repository: spark Updated Branches: refs/heads/master 6be272b75 -> 28d337440 [SPARK-23647][PYTHON][SQL] Adds more types for hint in pyspark Signed-off-by: DylanGuedes ## What changes were proposed in this pull request? Addition of float, int and list hints for `pyspark.sql` Hint. ## How was this patch tested? I did manual tests following the same principles used in the Scala version, and also added unit tests. Closes #20788 from DylanGuedes/jira-21030. Authored-by: DylanGuedes Signed-off-by: Hyukjin Kwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28d33744 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28d33744 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28d33744 Branch: refs/heads/master Commit: 28d33744076abd8bf7955eefcbdeef4849a99c40 Parents: 6be272b Author: DylanGuedes Authored: Sat Dec 1 10:37:03 2018 +0800 Committer: Hyukjin Kwon Committed: Sat Dec 1 10:37:03 2018 +0800 -- python/pyspark/sql/dataframe.py| 6 -- python/pyspark/sql/tests/test_dataframe.py | 13 + 2 files changed, 17 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/28d33744/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index b8833a3..1b1092c 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -485,10 +485,12 @@ class DataFrame(object): if not isinstance(name, str): raise TypeError("name should be provided as str, got {0}".format(type(name))) +allowed_types = (basestring, list, float, int) for p in parameters: -if not isinstance(p, str): +if not isinstance(p, allowed_types): raise TypeError( -"all parameters should be str, got {0} of type {1}".format(p, type(p))) +"all parameters should be in {0}, got {1} of type {2}".format( +allowed_types, p, type(p))) jdf = self._jdf.hint(name, self._jseq(parameters)) return DataFrame(jdf, self.sql_ctx) http://git-wip-us.apache.org/repos/asf/spark/blob/28d33744/python/pyspark/sql/tests/test_dataframe.py -- diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 908d400..65edf59 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -375,6 +375,19 @@ class DataFrameTests(ReusedSQLTestCase): plan = df1.join(df2.hint("broadcast"), "id")._jdf.queryExecution().executedPlan() self.assertEqual(1, plan.toString().count("BroadcastHashJoin")) +# add tests for SPARK-23647 (test more types for hint) +def test_extended_hint_types(self): +from pyspark.sql import DataFrame + +df = self.spark.range(10e10).toDF("id") +such_a_nice_list = ["itworks1", "itworks2", "itworks3"] +hinted_df = df.hint("my awesome hint", 1.2345, "what", such_a_nice_list) +logical_plan = hinted_df._jdf.queryExecution().logical() + +self.assertEqual(1, logical_plan.toString().count("1.2345")) +self.assertEqual(1, logical_plan.toString().count("what")) +self.assertEqual(3, logical_plan.toString().count("itworks")) + def test_sample(self): self.assertRaisesRegexp( TypeError, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r31279 - in /dev/spark/3.0.0-SNAPSHOT-2018_11_30_17_40-6be272b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Dec 1 01:52:30 2018 New Revision: 31279 Log: Apache Spark 3.0.0-SNAPSHOT-2018_11_30_17_40-6be272b docs [This commit notification would consist of 1764 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/3] spark git commit: [SPARK-25876][K8S] Simplify kubernetes configuration types.
http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala -- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 167fb40..a5ad972 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -21,57 +21,46 @@ import java.io.File import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.SparkConf -import org.apache.spark.deploy.k8s.{Config, KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ private[spark] class KubernetesDriverBuilder( -provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep = +provideBasicStep: (KubernetesDriverConf => BasicDriverFeatureStep) = new BasicDriverFeatureStep(_), -provideCredentialsStep: (KubernetesConf[KubernetesDriverSpecificConf]) - => DriverKubernetesCredentialsFeatureStep = +provideCredentialsStep: (KubernetesDriverConf => DriverKubernetesCredentialsFeatureStep) = new DriverKubernetesCredentialsFeatureStep(_), -provideServiceStep: (KubernetesConf[KubernetesDriverSpecificConf]) => DriverServiceFeatureStep = +provideServiceStep: (KubernetesDriverConf => DriverServiceFeatureStep) = new DriverServiceFeatureStep(_), -provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] - => MountSecretsFeatureStep) = +provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) = new MountSecretsFeatureStep(_), -provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] - => EnvSecretsFeatureStep) = +provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) = new EnvSecretsFeatureStep(_), -provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]) - => LocalDirsFeatureStep = +provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) = new LocalDirsFeatureStep(_), -provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] - => MountVolumesFeatureStep) = +provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) = new MountVolumesFeatureStep(_), -provideDriverCommandStep: ( - KubernetesConf[KubernetesDriverSpecificConf] - => DriverCommandFeatureStep) = +provideDriverCommandStep: (KubernetesDriverConf => DriverCommandFeatureStep) = new DriverCommandFeatureStep(_), -provideHadoopGlobalStep: ( - KubernetesConf[KubernetesDriverSpecificConf] -=> KerberosConfDriverFeatureStep) = -new KerberosConfDriverFeatureStep(_), -providePodTemplateConfigMapStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] - => PodTemplateConfigMapStep) = -new PodTemplateConfigMapStep(_), -provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) { +provideHadoopGlobalStep: (KubernetesDriverConf => KerberosConfDriverFeatureStep) = + new KerberosConfDriverFeatureStep(_), +providePodTemplateConfigMapStep: (KubernetesConf => PodTemplateConfigMapStep) = + new PodTemplateConfigMapStep(_), +provideInitialPod: () => SparkPod = () => SparkPod.initialPod) { - def buildFromFeatures( -kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { + def buildFromFeatures(kubernetesConf: KubernetesDriverConf): KubernetesDriverSpec = { val baseFeatures = Seq( provideBasicStep(kubernetesConf), provideCredentialsStep(kubernetesConf), provideServiceStep(kubernetesConf), provideLocalDirsStep(kubernetesConf)) -val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { +val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) { Seq(provideSecretsStep(kubernetesConf)) } else Nil -val envSecretFeature = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) { +val envSecretFeature = if (kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) { Seq(provideEnvSecretsStep(kubernetesConf)) } else Nil -val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) { +val volumesFeature = if (kubernetesConf.volumes.nonEmpty) { Seq(provideVolumesStep(kubernetesConf)) } else Nil val podTemplateFeature = if ( @@ -81,14 +70,12 @@ private[spark] class KubernetesDriverBuilder( val driverCommandStep =
[1/3] spark git commit: [SPARK-25876][K8S] Simplify kubernetes configuration types.
Repository: spark Updated Branches: refs/heads/master 8856e9f6a -> 6be272b75 http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala -- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 3708864..7e7dc47 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -83,48 +83,21 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => templateVolumeStep) test("Apply fundamental steps all the time.") { -val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( -JavaMainAppResource(Some("example.jar")), -"test-app", -"main", -Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None) +val conf = KubernetesTestConf.createDriverConf() validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - DRIVER_CMD_STEP_TYPE) + DRIVER_CMD_STEP_TYPE, + HADOOP_GLOBAL_STEP_TYPE) } test("Apply secrets step if secrets are present.") { -val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( -JavaMainAppResource(None), -"test-app", -"main", -Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map("secret" -> "secretMountPath"), - Map("EnvName" -> "SecretName:secretKey"), - Map.empty, - Nil, - hadoopConfSpec = None) +val conf = KubernetesTestConf.createDriverConf( + secretEnvNamesToKeyRefs = Map("EnvName" -> "SecretName:secretKey"), + secretNamesToMountPaths = Map("secret" -> "secretMountPath")) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -133,7 +106,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { LOCAL_DIRS_STEP_TYPE, SECRETS_STEP_TYPE, ENV_SECRETS_STEP_TYPE, - DRIVER_CMD_STEP_TYPE) + DRIVER_CMD_STEP_TYPE, + HADOOP_GLOBAL_STEP_TYPE) } test("Apply volumes step if mounts are present.") { @@ -143,22 +117,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { "", false, KubernetesHostPathVolumeConf("/path")) -val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( -JavaMainAppResource(None), -"test-app", -"main", -Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - volumeSpec :: Nil, - hadoopConfSpec = None) +val conf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeSpec)) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -166,7 +125,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, MOUNT_VOLUMES_STEP_TYPE, - DRIVER_CMD_STEP_TYPE) + DRIVER_CMD_STEP_TYPE, + HADOOP_GLOBAL_STEP_TYPE) } test("Apply volumes step if a mount subpath is present.") { @@ -176,22 +136,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { "foo", false, KubernetesHostPathVolumeConf("/path")) -val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( -JavaMainAppResource(None), -"test-app", -"main", -Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - volumeSpec :: Nil, - hadoopConfSpec = None) +val conf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeSpec)) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -199,89 +144,14 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, MOUNT_VOLUMES_STEP_TYPE, - DRIVER_CMD_STEP_TYPE) - } - - test("Apply template volume step if executor template is present.") { -val sparkConf = spy(new SparkConf(false)) -doReturn(Option("filename")).when(sparkConf) - .get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) -
[3/3] spark git commit: [SPARK-25876][K8S] Simplify kubernetes configuration types.
[SPARK-25876][K8S] Simplify kubernetes configuration types. There are a few issues with the current configuration types used in the kubernetes backend: - they use type parameters for role-specific specialization, which makes type signatures really noisy throughout the code base. - they break encapsulation by forcing the code that creates the config object to remove the configuration from SparkConf before creating the k8s-specific wrapper. - they don't provide an easy way for tests to have default values for fields they do not use. This change fixes those problems by: - creating a base config type with role-specific specialization using inheritance - encapsulating the logic of parsing SparkConf into k8s-specific views inside the k8s config classes - providing some helper code for tests to easily override just the part of the configs they want. Most of the change relates to the above, especially cleaning up the tests. While doing that, I also made some smaller changes elsewhere: - removed unnecessary type parameters in KubernetesVolumeSpec - simplified the error detection logic in KubernetesVolumeUtils; all the call sites would just throw the first exception collected by that class, since they all called "get" on the "Try" object. Now the unnecessary wrapping is gone and the exception is just thrown where it occurs. - removed a lot of unnecessary mocking from tests. - changed the kerberos-related code so that less logic needs to live in the driver builder. In spirit it should be part of the upcoming work in this series of cleanups, but it made parts of this change simpler. Tested with existing unit tests and integration tests. Author: Marcelo Vanzin Closes #22959 from vanzin/SPARK-25876. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6be272b7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6be272b7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6be272b7 Branch: refs/heads/master Commit: 6be272b75b4ae3149869e19df193675cc4117763 Parents: 8856e9f Author: Marcelo Vanzin Authored: Fri Nov 30 16:23:37 2018 -0800 Committer: mcheah Committed: Fri Nov 30 16:23:37 2018 -0800 -- .../org/apache/spark/deploy/k8s/Config.scala| 17 +- .../spark/deploy/k8s/KubernetesConf.scala | 302 +-- .../spark/deploy/k8s/KubernetesVolumeSpec.scala | 10 +- .../deploy/k8s/KubernetesVolumeUtils.scala | 53 +--- .../k8s/features/BasicDriverFeatureStep.scala | 24 +- .../k8s/features/BasicExecutorFeatureStep.scala | 29 +- .../k8s/features/DriverCommandFeatureStep.scala | 22 +- ...DriverKubernetesCredentialsFeatureStep.scala | 6 +- .../k8s/features/DriverServiceFeatureStep.scala | 10 +- .../k8s/features/EnvSecretsFeatureStep.scala| 11 +- .../HadoopConfExecutorFeatureStep.scala | 14 +- .../HadoopSparkUserExecutorFeatureStep.scala| 17 +- .../KerberosConfDriverFeatureStep.scala | 113 --- .../KerberosConfExecutorFeatureStep.scala | 21 +- .../k8s/features/LocalDirsFeatureStep.scala | 9 +- .../k8s/features/MountSecretsFeatureStep.scala | 13 +- .../k8s/features/MountVolumesFeatureStep.scala | 11 +- .../k8s/features/PodTemplateConfigMapStep.scala | 5 +- .../hadooputils/HadoopKerberosLogin.scala | 64 ...KubernetesHadoopDelegationTokenManager.scala | 37 --- .../submit/KubernetesClientApplication.scala| 61 +--- .../k8s/submit/KubernetesDriverBuilder.scala| 53 ++-- .../cluster/k8s/KubernetesExecutorBuilder.scala | 36 +-- .../spark/deploy/k8s/KubernetesConfSuite.scala | 71 ++--- .../spark/deploy/k8s/KubernetesTestConf.scala | 138 + .../deploy/k8s/KubernetesVolumeUtilsSuite.scala | 30 +- .../features/BasicDriverFeatureStepSuite.scala | 127 ++-- .../BasicExecutorFeatureStepSuite.scala | 103 ++- .../DriverCommandFeatureStepSuite.scala | 29 +- ...rKubernetesCredentialsFeatureStepSuite.scala | 69 + .../DriverServiceFeatureStepSuite.scala | 193 .../features/EnvSecretsFeatureStepSuite.scala | 32 +- .../features/LocalDirsFeatureStepSuite.scala| 46 +-- .../features/MountSecretsFeatureStepSuite.scala | 21 +- .../features/MountVolumesFeatureStepSuite.scala | 56 ++-- .../PodTemplateConfigMapStepSuite.scala | 28 +- .../spark/deploy/k8s/submit/ClientSuite.scala | 47 +-- .../submit/KubernetesDriverBuilderSuite.scala | 204 ++--- .../k8s/ExecutorPodsAllocatorSuite.scala| 43 +-- .../k8s/KubernetesExecutorBuilderSuite.scala| 114 ++- 40 files changed, 777 insertions(+), 1512 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
spark git commit: [SPARK-26219][CORE] Executor summary should get updated for failure jobs in the history server UI
Repository: spark Updated Branches: refs/heads/master 36edbac1c -> 8856e9f6a [SPARK-26219][CORE] Executor summary should get updated for failure jobs in the history server UI The root cause of the problem is, whenever the taskEnd event comes after stageCompleted event, execSummary is updating only for live UI. we need to update for history UI too. To see the previous discussion, refer: PR for https://github.com/apache/spark/pull/23038, https://issues.apache.org/jira/browse/SPARK-26100. Added UT. Manually verified Test step to reproduce: ``` bin/spark-shell --master yarn --conf spark.executor.instances=3 sc.parallelize(1 to 1, 10).map{ x => throw new RuntimeException("Bad executor")}.collect() ``` Open Executors page from the History UI Before patch: ![screenshot from 2018-11-29 22-13-34](https://user-images.githubusercontent.com/23054875/49246338-a21ead00-f43a-11e8-8214-f1020420be52.png) After patch: ![screenshot from 2018-11-30 00-54-49](https://user-images.githubusercontent.com/23054875/49246353-aa76e800-f43a-11e8-98ef-7faecaa7a50e.png) Closes #23181 from shahidki31/executorUpdate. Authored-by: Shahid Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8856e9f6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8856e9f6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8856e9f6 Branch: refs/heads/master Commit: 8856e9f6a3d5c019fcae45dbbdfa9128cd700e19 Parents: 36edbac Author: Shahid Authored: Fri Nov 30 15:20:05 2018 -0800 Committer: Marcelo Vanzin Committed: Fri Nov 30 15:22:37 2018 -0800 -- .../apache/spark/status/AppStatusListener.scala | 19 ++-- .../spark/status/AppStatusListenerSuite.scala | 92 2 files changed, 64 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8856e9f6/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 8e84557..bd3f58b 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -641,9 +641,14 @@ private[spark] class AppStatusListener( } } - // Force an update on live applications when the number of active tasks reaches 0. This is - // checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date. - conditionalLiveUpdate(exec, now, exec.activeTasks == 0) + // Force an update on both live and history applications when the number of active tasks + // reaches 0. This is checked in some tests (e.g. SQLTestUtilsBase) so it needs to be + // reliably up to date. + if (exec.activeTasks == 0) { +update(exec, now) + } else { +maybeUpdate(exec, now) + } } } @@ -1024,14 +1029,6 @@ private[spark] class AppStatusListener( } } - private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = { -if (condition) { - liveUpdate(entity, now) -} else { - maybeUpdate(entity, now) -} - } - private def cleanupExecutors(count: Long): Unit = { // Because the limit is on the number of *dead* executors, we need to calculate whether // there are actually enough dead executors to be deleted. http://git-wip-us.apache.org/repos/asf/spark/blob/8856e9f6/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 7860a0d..61fec8c 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1273,48 +1273,68 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(allJobs.head.numFailedStages == 1) } - test("SPARK-25451: total tasks in the executor summary should match total stage tasks") { -val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) + Seq(true, false).foreach { live => +test(s"Total tasks in the executor summary should match total stage tasks (live = $live)") { -val listener = new AppStatusListener(store, testConf, true) + val testConf = if (live) { +conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) + } else { +conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, -1L) + } -val
spark git commit: [SPARK-26226][SQL] Update query tracker to report timeline for phases
Repository: spark Updated Branches: refs/heads/master 9b23be2e9 -> 36edbac1c [SPARK-26226][SQL] Update query tracker to report timeline for phases ## What changes were proposed in this pull request? This patch changes the query plan tracker added earlier to report phase timeline, rather than just a duration for each phase. This way, we can easily find time that's unaccounted for. ## How was this patch tested? Updated test cases to reflect that. Closes #23183 from rxin/SPARK-26226. Authored-by: Reynold Xin Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36edbac1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36edbac1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36edbac1 Branch: refs/heads/master Commit: 36edbac1c8337a4719f90e4abd58d38738b2e1fb Parents: 9b23be2 Author: Reynold Xin Authored: Fri Nov 30 14:23:18 2018 -0800 Committer: gatorsmile Committed: Fri Nov 30 14:23:18 2018 -0800 -- .../sql/catalyst/QueryPlanningTracker.scala | 45 .../catalyst/QueryPlanningTrackerSuite.scala| 18 +--- .../org/apache/spark/sql/SparkSession.scala | 2 +- .../spark/sql/execution/QueryExecution.scala| 8 ++-- .../QueryPlanningTrackerEndToEndSuite.scala | 15 +-- 5 files changed, 55 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/36edbac1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala index 244081c..cd75407 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/QueryPlanningTracker.scala @@ -41,6 +41,13 @@ object QueryPlanningTracker { val OPTIMIZATION = "optimization" val PLANNING = "planning" + /** + * Summary for a rule. + * @param totalTimeNs total amount of time, in nanosecs, spent in this rule. + * @param numInvocations number of times the rule has been invoked. + * @param numEffectiveInvocations number of times the rule has been invoked and + *resulted in a plan change. + */ class RuleSummary( var totalTimeNs: Long, var numInvocations: Long, var numEffectiveInvocations: Long) { @@ -52,6 +59,18 @@ object QueryPlanningTracker { } /** + * Summary of a phase, with start time and end time so we can construct a timeline. + */ + class PhaseSummary(val startTimeMs: Long, val endTimeMs: Long) { + +def durationMs: Long = endTimeMs - startTimeMs + +override def toString: String = { + s"PhaseSummary($startTimeMs, $endTimeMs)" +} + } + + /** * A thread local variable to implicitly pass the tracker around. This assumes the query planner * is single-threaded, and avoids passing the same tracker context in every function call. */ @@ -79,15 +98,25 @@ class QueryPlanningTracker { // Use a Java HashMap for less overhead. private val rulesMap = new java.util.HashMap[String, RuleSummary] - // From a phase to time in ns. - private val phaseToTimeNs = new java.util.HashMap[String, Long] + // From a phase to its start time and end time, in ms. + private val phasesMap = new java.util.HashMap[String, PhaseSummary] - /** Measure the runtime of function f, and add it to the time for the specified phase. */ - def measureTime[T](phase: String)(f: => T): T = { -val startTime = System.nanoTime() + /** + * Measure the start and end time of a phase. Note that if this function is called multiple + * times for the same phase, the recorded start time will be the start time of the first call, + * and the recorded end time will be the end time of the last call. + */ + def measurePhase[T](phase: String)(f: => T): T = { +val startTime = System.currentTimeMillis() val ret = f -val timeTaken = System.nanoTime() - startTime -phaseToTimeNs.put(phase, phaseToTimeNs.getOrDefault(phase, 0) + timeTaken) +val endTime = System.currentTimeMillis + +if (phasesMap.containsKey(phase)) { + val oldSummary = phasesMap.get(phase) + phasesMap.put(phase, new PhaseSummary(oldSummary.startTimeMs, endTime)) +} else { + phasesMap.put(phase, new PhaseSummary(startTime, endTime)) +} ret } @@ -114,7 +143,7 @@ class QueryPlanningTracker { def rules: Map[String, RuleSummary] = rulesMap.asScala.toMap - def phases: Map[String, Long] = phaseToTimeNs.asScala.toMap + def phases: Map[String, PhaseSummary] = phasesMap.asScala.toMap
svn commit: r31273 - in /dev/spark/3.0.0-SNAPSHOT-2018_11_30_13_29-9b23be2-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Nov 30 21:42:23 2018 New Revision: 31273 Log: Apache Spark 3.0.0-SNAPSHOT-2018_11_30_13_29-9b23be2 docs [This commit notification would consist of 1764 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r31262 - in /dev/spark/2.4.1-SNAPSHOT-2018_11_30_11_24-b68decf-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Nov 30 19:40:39 2018 New Revision: 31262 Log: Apache Spark 2.4.1-SNAPSHOT-2018_11_30_11_24-b68decf docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r31261 - in /dev/spark/2.3.3-SNAPSHOT-2018_11_30_11_23-4ee463a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Nov 30 19:38:44 2018 New Revision: 31261 Log: Apache Spark 2.3.3-SNAPSHOT-2018_11_30_11_23-4ee463a docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-26201] Fix python broadcast with encryption
Repository: spark Updated Branches: refs/heads/branch-2.4 4661ac76a -> b68decf19 [SPARK-26201] Fix python broadcast with encryption ## What changes were proposed in this pull request? Python with rpc and disk encryption enabled along with a python broadcast variable and just read the value back on the driver side the job failed with: Traceback (most recent call last): File "broadcast.py", line 37, in words_new.value File "/pyspark.zip/pyspark/broadcast.py", line 137, in value File "pyspark.zip/pyspark/broadcast.py", line 122, in load_from_path File "pyspark.zip/pyspark/broadcast.py", line 128, in load EOFError: Ran out of input To reproduce use configs: --conf spark.network.crypto.enabled=true --conf spark.io.encryption.enabled=true Code: words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) words_new.value print(words_new.value) ## How was this patch tested? words_new = sc.broadcast([âscalaâ, âjavaâ, âhadoopâ, âsparkâ, âakkaâ]) textFile = sc.textFile(âREADME.mdâ) wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word + words_new.value[1], 1)).reduceByKey(lambda a, b: a+b) count = wordCounts.count() print(count) words_new.value print(words_new.value) Closes #23166 from redsanket/SPARK-26201. Authored-by: schintap Signed-off-by: Thomas Graves (cherry picked from commit 9b23be2e95fec756066ca0ed3188c3db2602b757) Signed-off-by: Thomas Graves Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b68decf1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b68decf1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b68decf1 Branch: refs/heads/branch-2.4 Commit: b68decf190e402e3d29fa05726b16bd57fe1b078 Parents: 4661ac7 Author: schintap Authored: Fri Nov 30 12:48:56 2018 -0600 Committer: Thomas Graves Committed: Fri Nov 30 12:49:17 2018 -0600 -- .../org/apache/spark/api/python/PythonRDD.scala | 29 +--- python/pyspark/broadcast.py | 21 ++ python/pyspark/test_broadcast.py| 15 ++ 3 files changed, 56 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b68decf1/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 8b5a7a9..5ed5070 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -660,6 +660,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial with Logging { private var encryptionServer: PythonServer[Unit] = null + private var decryptionServer: PythonServer[Unit] = null /** * Read data from disks, then copy it to `out` @@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial override def handleConnection(sock: Socket): Unit = { val env = SparkEnv.get val in = sock.getInputStream() -val dir = new File(Utils.getLocalDir(env.conf)) -val file = File.createTempFile("broadcast", "", dir) -path = file.getAbsolutePath -val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path)) +val abspath = new File(path).getAbsolutePath +val out = env.serializerManager.wrapForEncryption(new FileOutputStream(abspath)) DechunkedInputStream.dechunkAndCopyToOutput(in, out) } } Array(encryptionServer.port, encryptionServer.secret) } + def setupDecryptionServer(): Array[Any] = { +decryptionServer = new PythonServer[Unit]("broadcast-decrypt-server-for-driver") { + override def handleConnection(sock: Socket): Unit = { +val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream())) +Utils.tryWithSafeFinally { + val in = SparkEnv.get.serializerManager.wrapForEncryption(new FileInputStream(path)) + Utils.tryWithSafeFinally { +Utils.copyStream(in, out, false) + } { +in.close() + } + out.flush() +} { + JavaUtils.closeQuietly(out) +} + } +} +Array(decryptionServer.port, decryptionServer.secret) + } + + def waitTillBroadcastDataSent(): Unit = decryptionServer.getResult() + def waitTillDataReceived(): Unit = encryptionServer.getResult() } // scalastyle:on no.finalize http://git-wip-us.apache.org/repos/asf/spark/blob/b68decf1/python/pyspark/broadcast.py -- diff --git
spark git commit: [SPARK-26201] Fix python broadcast with encryption
Repository: spark Updated Branches: refs/heads/master c3f27b243 -> 9b23be2e9 [SPARK-26201] Fix python broadcast with encryption ## What changes were proposed in this pull request? Python with rpc and disk encryption enabled along with a python broadcast variable and just read the value back on the driver side the job failed with: Traceback (most recent call last): File "broadcast.py", line 37, in words_new.value File "/pyspark.zip/pyspark/broadcast.py", line 137, in value File "pyspark.zip/pyspark/broadcast.py", line 122, in load_from_path File "pyspark.zip/pyspark/broadcast.py", line 128, in load EOFError: Ran out of input To reproduce use configs: --conf spark.network.crypto.enabled=true --conf spark.io.encryption.enabled=true Code: words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) words_new.value print(words_new.value) ## How was this patch tested? words_new = sc.broadcast([âscalaâ, âjavaâ, âhadoopâ, âsparkâ, âakkaâ]) textFile = sc.textFile(âREADME.mdâ) wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word + words_new.value[1], 1)).reduceByKey(lambda a, b: a+b) count = wordCounts.count() print(count) words_new.value print(words_new.value) Closes #23166 from redsanket/SPARK-26201. Authored-by: schintap Signed-off-by: Thomas Graves Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b23be2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b23be2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b23be2e Branch: refs/heads/master Commit: 9b23be2e95fec756066ca0ed3188c3db2602b757 Parents: c3f27b2 Author: schintap Authored: Fri Nov 30 12:48:56 2018 -0600 Committer: Thomas Graves Committed: Fri Nov 30 12:48:56 2018 -0600 -- .../org/apache/spark/api/python/PythonRDD.scala | 29 +--- python/pyspark/broadcast.py | 21 ++ python/pyspark/tests/test_broadcast.py | 15 ++ 3 files changed, 56 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9b23be2e/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 8b5a7a9..5ed5070 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -660,6 +660,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial with Logging { private var encryptionServer: PythonServer[Unit] = null + private var decryptionServer: PythonServer[Unit] = null /** * Read data from disks, then copy it to `out` @@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial override def handleConnection(sock: Socket): Unit = { val env = SparkEnv.get val in = sock.getInputStream() -val dir = new File(Utils.getLocalDir(env.conf)) -val file = File.createTempFile("broadcast", "", dir) -path = file.getAbsolutePath -val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path)) +val abspath = new File(path).getAbsolutePath +val out = env.serializerManager.wrapForEncryption(new FileOutputStream(abspath)) DechunkedInputStream.dechunkAndCopyToOutput(in, out) } } Array(encryptionServer.port, encryptionServer.secret) } + def setupDecryptionServer(): Array[Any] = { +decryptionServer = new PythonServer[Unit]("broadcast-decrypt-server-for-driver") { + override def handleConnection(sock: Socket): Unit = { +val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream())) +Utils.tryWithSafeFinally { + val in = SparkEnv.get.serializerManager.wrapForEncryption(new FileInputStream(path)) + Utils.tryWithSafeFinally { +Utils.copyStream(in, out, false) + } { +in.close() + } + out.flush() +} { + JavaUtils.closeQuietly(out) +} + } +} +Array(decryptionServer.port, decryptionServer.secret) + } + + def waitTillBroadcastDataSent(): Unit = decryptionServer.getResult() + def waitTillDataReceived(): Unit = encryptionServer.getResult() } // scalastyle:on no.finalize http://git-wip-us.apache.org/repos/asf/spark/blob/9b23be2e/python/pyspark/broadcast.py -- diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 1c7f2a7..29358b5 100644 ---
svn commit: r31257 - in /dev/spark/3.0.0-SNAPSHOT-2018_11_30_09_17-c3f27b2-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Nov 30 17:29:15 2018 New Revision: 31257 Log: Apache Spark 3.0.0-SNAPSHOT-2018_11_30_09_17-c3f27b2 docs [This commit notification would consist of 1764 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][DOCS] Fix typos
Repository: spark Updated Branches: refs/heads/master 2b2c94a3e -> c3f27b243 [MINOR][DOCS] Fix typos ## What changes were proposed in this pull request? Fix Typos. This PR is the complete version of https://github.com/apache/spark/pull/23145. ## How was this patch tested? NA Closes #23185 from kjmrknsn/docUpdate. Authored-by: Keiji Yoshida Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3f27b24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3f27b24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3f27b24 Branch: refs/heads/master Commit: c3f27b2437497396913fdec96f085c3626ef4e59 Parents: 2b2c94a Author: Keiji Yoshida Authored: Fri Nov 30 09:03:46 2018 -0600 Committer: Sean Owen Committed: Fri Nov 30 09:03:46 2018 -0600 -- docs/configuration.md| 2 +- docs/graphx-programming-guide.md | 4 ++-- docs/ml-datasource.md| 2 +- docs/ml-features.md | 8 docs/ml-pipeline.md | 2 +- docs/mllib-linear-methods.md | 4 ++-- docs/security.md | 2 +- docs/sparkr.md | 2 +- 8 files changed, 13 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c3f27b24/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index 04210d8..8914bd0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -498,7 +498,7 @@ Apart from these, the following properties are also available, and may be useful Reuse Python worker or not. If yes, it will use a fixed number of Python workers, does not need to fork() a Python process for every task. It will be very useful -if there is large broadcast, then the broadcast will not be needed to transferred +if there is a large broadcast, then the broadcast will not need to be transferred from JVM to Python worker for every task. http://git-wip-us.apache.org/repos/asf/spark/blob/c3f27b24/docs/graphx-programming-guide.md -- diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index cb96fd7..ecedeaf 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -522,7 +522,7 @@ val joinedGraph = graph.joinVertices(uniqueCosts, A key step in many graph analytics tasks is aggregating information about the neighborhood of each vertex. -For example, we might want to know the number of followers each user has or the average age of the +For example, we might want to know the number of followers each user has or the average age of the followers of each user. Many iterative graph algorithms (e.g., PageRank, Shortest Path, and connected components) repeatedly aggregate properties of neighboring vertices (e.g., current PageRank Value, shortest path to the source, and smallest reachable vertex id). @@ -700,7 +700,7 @@ a new value for the vertex property, and then send messages to neighboring verti super step. Unlike Pregel, messages are computed in parallel as a function of the edge triplet and the message computation has access to both the source and destination vertex attributes. Vertices that do not receive a message are skipped within a super -step. The Pregel operators terminates iteration and returns the final graph when there are no +step. The Pregel operator terminates iteration and returns the final graph when there are no messages remaining. > Note, unlike more standard Pregel implementations, vertices in GraphX can > only send messages to http://git-wip-us.apache.org/repos/asf/spark/blob/c3f27b24/docs/ml-datasource.md -- diff --git a/docs/ml-datasource.md b/docs/ml-datasource.md index 1508332..35afaef 100644 --- a/docs/ml-datasource.md +++ b/docs/ml-datasource.md @@ -5,7 +5,7 @@ displayTitle: Data sources --- In this section, we introduce how to use data source in ML to load data. -Beside some general data sources such as Parquet, CSV, JSON and JDBC, we also provide some specific data sources for ML. +Besides some general data sources such as Parquet, CSV, JSON and JDBC, we also provide some specific data sources for ML. **Table of Contents** http://git-wip-us.apache.org/repos/asf/spark/blob/c3f27b24/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index 83a211c..a140bc6 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -359,7 +359,7 @@ Assume that we have the following DataFrame with columns `id` and `raw`: id | raw
svn commit: r31238 - in /dev/spark/3.0.0-SNAPSHOT-2018_11_30_00_50-2b2c94a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Nov 30 09:02:24 2018 New Revision: 31238 Log: Apache Spark 3.0.0-SNAPSHOT-2018_11_30_00_50-2b2c94a docs [This commit notification would consist of 1764 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: [SPARK-25528][SQL] data source v2 API refactor (batch read)
Repository: spark Updated Branches: refs/heads/master 9cfc3ee62 -> 2b2c94a3e http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java -- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java new file mode 100644 index 000..cb5954d --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import java.io.IOException; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.sources.v2.SupportsBatchRead; +import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.sources.v2.reader.*; +import org.apache.spark.sql.types.StructType; + +abstract class JavaSimpleBatchTable implements Table, SupportsBatchRead { + + @Override + public StructType schema() { +return new StructType().add("i", "int").add("j", "int"); + } + + @Override + public String name() { +return this.getClass().toString(); + } +} + +abstract class JavaSimpleScanBuilder implements ScanBuilder, Scan, Batch { + + @Override + public Scan build() { +return this; + } + + @Override + public Batch toBatch() { +return this; + } + + @Override + public StructType readSchema() { +return new StructType().add("i", "int").add("j", "int"); + } + + @Override + public PartitionReaderFactory createReaderFactory() { +return new JavaSimpleReaderFactory(); + } +} + +class JavaSimpleReaderFactory implements PartitionReaderFactory { + + @Override + public PartitionReader createReader(InputPartition partition) { +JavaRangeInputPartition p = (JavaRangeInputPartition) partition; +return new PartitionReader() { + private int current = p.start - 1; + + @Override + public boolean next() throws IOException { +current += 1; +return current < p.end; + } + + @Override + public InternalRow get() { +return new GenericInternalRow(new Object[] {current, -current}); + } + + @Override + public void close() throws IOException { + + } +}; + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java -- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java index 2cdbba8..852c454 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java @@ -17,17 +17,17 @@ package test.org.apache.spark.sql.sources.v2; -import org.apache.spark.sql.sources.v2.BatchReadSupportProvider; -import org.apache.spark.sql.sources.v2.DataSourceV2; import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.reader.*; -public class JavaSimpleDataSourceV2 implements DataSourceV2, BatchReadSupportProvider { +public class JavaSimpleDataSourceV2 implements TableProvider { - class ReadSupport extends JavaSimpleReadSupport { + class MyScanBuilder extends JavaSimpleScanBuilder { @Override -public InputPartition[] planInputPartitions(ScanConfig config) { +public InputPartition[] planInputPartitions() { InputPartition[] partitions = new InputPartition[2]; partitions[0] = new JavaRangeInputPartition(0, 5); partitions[1] = new JavaRangeInputPartition(5, 10); @@ -36,7 +36,12 @@ public class JavaSimpleDataSourceV2 implements DataSourceV2, BatchReadSupportPro }
[2/2] spark git commit: [SPARK-25528][SQL] data source v2 API refactor (batch read)
[SPARK-25528][SQL] data source v2 API refactor (batch read) ## What changes were proposed in this pull request? This is the first step of the data source v2 API refactor [proposal](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing) It adds the new API for batch read, without removing the old APIs, as they are still needed for streaming sources. More concretely, it adds 1. `TableProvider`, works like an anonymous catalog 2. `Table`, represents a structured data set. 3. `ScanBuilder` and `Scan`, a logical represents of data source scan 4. `Batch`, a physical representation of data source batch scan. ## How was this patch tested? existing tests Closes #23086 from cloud-fan/refactor-batch. Authored-by: Wenchen Fan Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b2c94a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b2c94a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b2c94a3 Branch: refs/heads/master Commit: 2b2c94a3ee89630047bcdd416a977e0d1cdb1926 Parents: 9cfc3ee Author: Wenchen Fan Authored: Fri Nov 30 00:02:43 2018 -0800 Committer: gatorsmile Committed: Fri Nov 30 00:02:43 2018 -0800 -- .../kafka010/KafkaContinuousSourceSuite.scala | 4 +- .../sql/kafka010/KafkaContinuousTest.scala | 4 +- project/MimaExcludes.scala | 48 ++-- .../spark/sql/sources/v2/SupportsBatchRead.java | 33 +++ .../org/apache/spark/sql/sources/v2/Table.java | 59 + .../spark/sql/sources/v2/TableProvider.java | 64 + .../spark/sql/sources/v2/reader/Batch.java | 48 .../reader/OldSupportsReportPartitioning.java | 38 +++ .../v2/reader/OldSupportsReportStatistics.java | 38 +++ .../spark/sql/sources/v2/reader/Scan.java | 68 + .../sql/sources/v2/reader/ScanBuilder.java | 30 +++ .../spark/sql/sources/v2/reader/ScanConfig.java | 4 +- .../spark/sql/sources/v2/reader/Statistics.java | 2 +- .../v2/reader/SupportsPushDownFilters.java | 4 +- .../reader/SupportsPushDownRequiredColumns.java | 4 +- .../v2/reader/SupportsReportPartitioning.java | 8 +- .../v2/reader/SupportsReportStatistics.java | 6 +- .../v2/reader/partitioning/Partitioning.java| 3 +- .../org/apache/spark/sql/DataFrameReader.scala | 36 +-- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../datasources/v2/DataSourceV2Relation.scala | 90 +++ .../datasources/v2/DataSourceV2ScanExec.scala | 68 ++--- .../datasources/v2/DataSourceV2Strategy.scala | 34 +-- .../v2/DataSourceV2StreamingScanExec.scala | 120 + .../execution/streaming/ProgressReporter.scala | 4 +- .../continuous/ContinuousExecution.scala| 5 +- .../sources/v2/JavaAdvancedDataSourceV2.java| 116 + .../sources/v2/JavaColumnarDataSourceV2.java| 27 +- .../v2/JavaPartitionAwareDataSource.java| 29 ++- .../v2/JavaSchemaRequiredDataSource.java| 36 ++- .../sql/sources/v2/JavaSimpleBatchTable.java| 91 +++ .../sql/sources/v2/JavaSimpleDataSourceV2.java | 19 +- .../sql/sources/v2/JavaSimpleReadSupport.java | 90 --- .../sql/sources/v2/DataSourceV2Suite.scala | 260 ++- .../sources/v2/SimpleWritableDataSource.scala | 35 +-- .../streaming/continuous/ContinuousSuite.scala | 4 +- 36 files changed, 1016 insertions(+), 515 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index af51021..9ba066a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.clients.producer.ProducerRecord import org.apache.spark.sql.Dataset -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.streaming.Trigger @@ -208,7 +208,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { eventually(timeout(streamingTimeout)) { assert(