Spark3.3 with parquet 1.10.x

2023-07-24 Thread Pralabh Kumar
Hi Spark Users .

I have a quick question with respect to Spark 3.3. Currently Spark 3.3 is
built with parquet 1.12.
However, anyone tried Spark 3.3 with parquet 1.10 .

We are at Uber , planning to migrate Spark 3.3 but we have limitations of
using parquet 1.10  . Has anyone tried building Spark 3.3 with parquet 1.10
? What are the dos/ don't for it ?


Regards
Pralabh Kumar


Re: Driver takes long time to finish once job ends

2022-11-22 Thread Pralabh Kumar
Cores and memory setting of driver ?

On Wed, 23 Nov 2022, 12:56 Pralabh Kumar,  wrote:

> How many cores and  u are running driver with?
>
> On Tue, 22 Nov 2022, 21:00 Nikhil Goyal,  wrote:
>
>> Hi folks,
>> We are running a job on our on prem cluster on K8s but writing the output
>> to S3. We noticed that all the executors finish in < 1h but the driver
>> takes another 5h to finish. Logs:
>>
>> 22/11/22 02:08:29 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 
>> 10.42.145.11:39001 in memory (size: 7.3 KiB, free: 9.4 GiB)
>> 22/11/22 *02:08:29* INFO BlockManagerInfo: Removed broadcast_3_piece0 on 
>> 10.42.137.10:33425 in memory (size: 7.3 KiB, free: 9.4 GiB)
>> 22/11/22 *04:57:46* INFO FileFormatWriter: Write Job 
>> 4f0051fc-dda9-457f-a072-26311fd5e132 committed.
>> 22/11/22 04:57:46 INFO FileFormatWriter: Finished processing stats for write 
>> job 4f0051fc-dda9-457f-a072-26311fd5e132.
>> 22/11/22 04:57:47 INFO FileUtils: Creating directory if it doesn't exist: 
>> s3://rbx.usr/masked/dw_pii/creator_analytics_user_universe_first_playsession_dc_ngoyal/ds=2022-10-21
>> 22/11/22 04:57:48 INFO SessionState: Could not get hdfsEncryptionShim, it is 
>> only applicable to hdfs filesystem.
>> 22/11/22 *04:57:48* INFO SessionState: Could not get hdfsEncryptionShim, it 
>> is only applicable to hdfs filesystem.
>> 22/11/22 *07:20:20* WARN ExecutorPodsWatchSnapshotSource: Kubernetes client 
>> has been closed (this is expected if the application is shutting down.)
>> 22/11/22 07:20:22 INFO MapOutputTrackerMasterEndpoint: 
>> MapOutputTrackerMasterEndpoint stopped!
>> 22/11/22 07:20:22 INFO MemoryStore: MemoryStore cleared
>> 22/11/22 07:20:22 INFO BlockManager: BlockManager stopped
>> 22/11/22 07:20:22 INFO BlockManagerMaster: BlockManagerMaster stopped
>> 22/11/22 07:20:22 INFO 
>> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
>> OutputCommitCoordinator stopped!
>> 22/11/22 07:20:22 INFO SparkContext: Successfully stopped SparkContext
>> 22/11/22 07:20:22 INFO ShutdownHookManager: Shutdown hook called
>> 22/11/22 07:20:22 INFO ShutdownHookManager: Deleting directory 
>> /tmp/spark-d9aa302f-86f2-4668-9c01-07b3e71cba82
>> 22/11/22 07:20:22 INFO ShutdownHookManager: Deleting directory 
>> /var/data/spark-5295849e-a0f3-4355-9a6a-b510616aefaa/spark-43772336-8c86-4e2b-839e-97b2442b2959
>> 22/11/22 07:20:22 INFO MetricsSystemImpl: Stopping s3a-file-system metrics 
>> system...
>> 22/11/22 07:20:22 INFO MetricsSystemImpl: s3a-file-system metrics system 
>> stopped.
>> 22/11/22 07:20:22 INFO MetricsSystemImpl: s3a-file-system metrics system 
>> shutdown complete.
>>
>> Seems like the job is taking time to write to S3. Any idea how to fix this 
>> issue?
>>
>> Thanks
>>
>>


Re: Driver takes long time to finish once job ends

2022-11-22 Thread Pralabh Kumar
How many cores and  u are running driver with?

On Tue, 22 Nov 2022, 21:00 Nikhil Goyal,  wrote:

> Hi folks,
> We are running a job on our on prem cluster on K8s but writing the output
> to S3. We noticed that all the executors finish in < 1h but the driver
> takes another 5h to finish. Logs:
>
> 22/11/22 02:08:29 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 
> 10.42.145.11:39001 in memory (size: 7.3 KiB, free: 9.4 GiB)
> 22/11/22 *02:08:29* INFO BlockManagerInfo: Removed broadcast_3_piece0 on 
> 10.42.137.10:33425 in memory (size: 7.3 KiB, free: 9.4 GiB)
> 22/11/22 *04:57:46* INFO FileFormatWriter: Write Job 
> 4f0051fc-dda9-457f-a072-26311fd5e132 committed.
> 22/11/22 04:57:46 INFO FileFormatWriter: Finished processing stats for write 
> job 4f0051fc-dda9-457f-a072-26311fd5e132.
> 22/11/22 04:57:47 INFO FileUtils: Creating directory if it doesn't exist: 
> s3://rbx.usr/masked/dw_pii/creator_analytics_user_universe_first_playsession_dc_ngoyal/ds=2022-10-21
> 22/11/22 04:57:48 INFO SessionState: Could not get hdfsEncryptionShim, it is 
> only applicable to hdfs filesystem.
> 22/11/22 *04:57:48* INFO SessionState: Could not get hdfsEncryptionShim, it 
> is only applicable to hdfs filesystem.
> 22/11/22 *07:20:20* WARN ExecutorPodsWatchSnapshotSource: Kubernetes client 
> has been closed (this is expected if the application is shutting down.)
> 22/11/22 07:20:22 INFO MapOutputTrackerMasterEndpoint: 
> MapOutputTrackerMasterEndpoint stopped!
> 22/11/22 07:20:22 INFO MemoryStore: MemoryStore cleared
> 22/11/22 07:20:22 INFO BlockManager: BlockManager stopped
> 22/11/22 07:20:22 INFO BlockManagerMaster: BlockManagerMaster stopped
> 22/11/22 07:20:22 INFO 
> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
> OutputCommitCoordinator stopped!
> 22/11/22 07:20:22 INFO SparkContext: Successfully stopped SparkContext
> 22/11/22 07:20:22 INFO ShutdownHookManager: Shutdown hook called
> 22/11/22 07:20:22 INFO ShutdownHookManager: Deleting directory 
> /tmp/spark-d9aa302f-86f2-4668-9c01-07b3e71cba82
> 22/11/22 07:20:22 INFO ShutdownHookManager: Deleting directory 
> /var/data/spark-5295849e-a0f3-4355-9a6a-b510616aefaa/spark-43772336-8c86-4e2b-839e-97b2442b2959
> 22/11/22 07:20:22 INFO MetricsSystemImpl: Stopping s3a-file-system metrics 
> system...
> 22/11/22 07:20:22 INFO MetricsSystemImpl: s3a-file-system metrics system 
> stopped.
> 22/11/22 07:20:22 INFO MetricsSystemImpl: s3a-file-system metrics system 
> shutdown complete.
>
> Seems like the job is taking time to write to S3. Any idea how to fix this 
> issue?
>
> Thanks
>
>


Re: Spark3.2 on K8s with proxy-user

2022-04-21 Thread Pralabh Kumar
Further information . I have kerberized cluster and am also doing the kinit
.  Problem is only coming where the proxy user is being used .

On Fri, Apr 22, 2022 at 10:21 AM Pralabh Kumar 
wrote:

> Hi
>
> Running Spark 3.2 on K8s with --proxy-user and getting below error and
> then the job fails . However when running without a proxy user job is
> running fine . Can anyone please help me with the same .
>
>
> 22/04/21 17:50:30 WARN Client: Exception encountered while connecting to
> the server : org.apache.hadoop.security.AccessControlException: Client
> cannot authenticate via:[TOKEN, KERBEROS]
>
> 22/04/21 17:50:31 WARN Client: Exception encountered while connecting to
> the server : org.apache.hadoop.security.AccessControlException: Client
> cannot authenticate via:[TOKEN, KERBEROS]
>
> 22/04/21 17:50:37 WARN Client: Exception encountered while connecting to
> the server : org.apache.hadoop.security.AccessControlException: Client
> cannot authenticate via:[TOKEN, KERBEROS]
>
> 22/04/21 17:50:53 WARN Client: Exception encountered while connecting to
> the server : org.apache.hadoop.security.AccessControlException: Client
> cannot authenticate via:[TOKEN, KERBEROS]
>
> 22/04/21 17:51:32 WARN Client: Exception encountered while connecting to
> the server : org.apache.hadoop.security.AccessControlException: Client
> cannot authenticate via:[TOKEN, KERBEROS]
>
> 22/04/21 17:52:07 WARN Client: Exception encountered while connecting to
> the server : org.apache.hadoop.security.AccessControlException: Client
> cannot authenticate via:[TOKEN, KERBEROS]
>
> 22/04/21 17:52:27 WARN Client: Exception encountered while connecting to
> the server : org.apache.hadoop.security.AccessControlException: Client
> cannot authenticate via:[TOKEN, KERBEROS]
>
> 22/04/21 17:52:53 WARN Client: Exception encountered while connecting to
> the server : org.apache.hadoop.security.AccessControlException: Client
> cannot authenticate via:[TOKEN, KERBEROS]
>
>


Spark3.2 on K8s with proxy-user

2022-04-21 Thread Pralabh Kumar
Hi

Running Spark 3.2 on K8s with --proxy-user and getting below error and then
the job fails . However when running without a proxy user job is running
fine . Can anyone please help me with the same .


22/04/21 17:50:30 WARN Client: Exception encountered while connecting to
the server : org.apache.hadoop.security.AccessControlException: Client
cannot authenticate via:[TOKEN, KERBEROS]

22/04/21 17:50:31 WARN Client: Exception encountered while connecting to
the server : org.apache.hadoop.security.AccessControlException: Client
cannot authenticate via:[TOKEN, KERBEROS]

22/04/21 17:50:37 WARN Client: Exception encountered while connecting to
the server : org.apache.hadoop.security.AccessControlException: Client
cannot authenticate via:[TOKEN, KERBEROS]

22/04/21 17:50:53 WARN Client: Exception encountered while connecting to
the server : org.apache.hadoop.security.AccessControlException: Client
cannot authenticate via:[TOKEN, KERBEROS]

22/04/21 17:51:32 WARN Client: Exception encountered while connecting to
the server : org.apache.hadoop.security.AccessControlException: Client
cannot authenticate via:[TOKEN, KERBEROS]

22/04/21 17:52:07 WARN Client: Exception encountered while connecting to
the server : org.apache.hadoop.security.AccessControlException: Client
cannot authenticate via:[TOKEN, KERBEROS]

22/04/21 17:52:27 WARN Client: Exception encountered while connecting to
the server : org.apache.hadoop.security.AccessControlException: Client
cannot authenticate via:[TOKEN, KERBEROS]

22/04/21 17:52:53 WARN Client: Exception encountered while connecting to
the server : org.apache.hadoop.security.AccessControlException: Client
cannot authenticate via:[TOKEN, KERBEROS]


Spark 3.0.1 and spark 3.2 compatibility

2022-04-07 Thread Pralabh Kumar
Hi spark community

I have quick question .I am planning to migrate from spark 3.0.1 to spark
3.2.

Do I need to recompile my application with 3.2 dependencies or application
compiled with 3.0.1 will work fine on 3.2 ?


Regards
Pralabh kumar


Spark on K8s , some applications ended ungracefully

2022-03-31 Thread Pralabh Kumar
Hi Spark Team

Some of my spark applications on K8s ended with the below error . These
applications though completed successfully (as per the event log
SparkListenerApplicationEnd event at the end)
stil have even files with .inprogress. This causes the application to be
shown as inprogress in SHS.

Spark v : 3.0.1



22/03/31 08:33:34 WARN ShutdownHookManager: ShutdownHook '$anon$2' timeout,
java.util.concurrent.TimeoutException

java.util.concurrent.TimeoutException

at java.util.concurrent.FutureTask.get(FutureTask.java:205)

at
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:68)

22/03/31 08:33:34 WARN SparkContext: Ignoring Exception while stopping
SparkContext from shutdown hook

java.lang.InterruptedException

at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)

at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)

at
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)

at
org.apache.spark.util.ThreadUtils$.shutdown(ThreadUtils.scala:348)





Please let me know if there is a solution for it ..

Regards

Pralabh Kumar


Skip single integration test case in Spark on K8s

2022-03-16 Thread Pralabh Kumar
Hi Spark team

I am running Spark kubernetes integration test suite on cloud.

build/mvn install \

-f  pom.xml \

-pl resource-managers/kubernetes/integration-tests -am -Pscala-2.12
-Phadoop-3.1.1 -Phive -Phive-thriftserver -Pyarn -Pkubernetes
-Pkubernetes-integration-tests \

-Djava.version=8 \

-Dspark.kubernetes.test.sparkTgz= \

-Dspark.kubernetes.test.imageTag=<> \

-Dspark.kubernetes.test.imageRepo=< <http://reg.visa.com/>repo> \

-Dspark.kubernetes.test.deployMode=cloud \

-Dtest.include.tags=k8s \

-Dspark.kubernetes.test.javaImageTag= \

-Dspark.kubernetes.test.namespace= \

-Dspark.kubernetes.test.serviceAccountName=spark \

-Dspark.kubernetes.test.kubeConfigContext=<> \

-Dspark.kubernetes.test.master=<> \

-Dspark.kubernetes.test.jvmImage=<> \

-Dspark.kubernetes.test.pythonImage=<> \

-Dlog4j.logger.org.apache.spark=DEBUG



I am successfully able to run some test cases and some are failing . For
e.g "Run SparkRemoteFileTest using a Remote data file" in KuberneterSuite
is failing.


Is there a way to skip running some of the test cases ?.



Please help me on the same.


Regards

Pralabh Kumar


Spark on K8s : property simillar to yarn.max.application.attempt

2022-02-04 Thread Pralabh Kumar
Hi Spark Team

I am running spark on K8s and looking for a
property/mechanism similar to  yarn.max.application.attempt . I know this
is not really a spark question , but i thought if anyone have faced the
similar issue,

Basically I want if my driver pod fails , it should be retried on a
different machine . Is there a way to do the same .

Regards
Pralabh Kumar


Re: Spark on k8s : spark 3.0.1 spark.kubernetes.executor.deleteontermination issue

2022-01-18 Thread Pralabh Kumar
Does this property spark.kubernetes.executor.deleteontermination checks
whether the executor which is deleted have shuffle data or not ?

On Tue, 18 Jan 2022, 11:20 Pralabh Kumar,  wrote:

> Hi spark team
>
> Have cluster wide property spark.kubernetis.executor.deleteontermination
> to true.
> During the long running job, some of the executor got deleted which have
> shuffle data. Because of this,  in the subsequent stage , we get lot of
> spark shuffle fetch fail exceptions.
>
>
> Please let me know , is there a way to fix it. Note if setting above
> property to false , I face no shuffle fetch exception.
>
>
> Regards
> Pralabh
>


Spark on k8s : spark 3.0.1 spark.kubernetes.executor.deleteontermination issue

2022-01-17 Thread Pralabh Kumar
Hi spark team

Have cluster wide property spark.kubernetis.executor.deleteontermination to
true.
During the long running job, some of the executor got deleted which have
shuffle data. Because of this,  in the subsequent stage , we get lot of
spark shuffle fetch fail exceptions.


Please let me know , is there a way to fix it. Note if setting above
property to false , I face no shuffle fetch exception.


Regards
Pralabh


Difference in behavior for Spark 3.0 vs Spark 3.1 "create database "

2022-01-10 Thread Pralabh Kumar
Hi Spark Team

When creating a database via Spark 3.0 on Hive

1) spark.sql("create database test location '/user/hive'").  It creates the
database location on hdfs . As expected

2) When running the same command on 3.1 the database is created on the
local file system by default. I have to prefix with hdfs to create db on
hdfs.

Why is there a difference in the behavior, Can you please point me to the
jira which causes this change.

Note : spark.sql.warehouse.dir and hive.metastore.warehouse.dir both are
having default values(not explicitly set)

Regards
Pralabh Kumar


ivy unit test case filing for Spark

2021-12-21 Thread Pralabh Kumar
Hi Spark Team

I am building a spark in VPN . But the unit test case below is failing.
This is pointing to ivy location which  cannot be reached within VPN . Any
help would be appreciated

test("SPARK-33084: Add jar support Ivy URI -- default transitive = true") {
  *sc *= new SparkContext(new
SparkConf().setAppName("test").setMaster("local-cluster[3,
1, 1024]"))
  *sc*.addJar("*ivy://org.apache.hive:hive-storage-api:2.7.0*")
  assert(*sc*.listJars().exists(_.contains(
"org.apache.hive_hive-storage-api-2.7.0.jar")))
  assert(*sc*.listJars().exists(_.contains(
"commons-lang_commons-lang-2.6.jar")))
}

Error

- SPARK-33084: Add jar support Ivy URI -- default transitive = true ***
FAILED ***
java.lang.RuntimeException: [unresolved dependency:
org.apache.hive#hive-storage-api;2.7.0: not found]
at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(
SparkSubmit.scala:1447)
at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(
DependencyUtils.scala:185)
at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(
DependencyUtils.scala:159)
at org.apache.spark.SparkContext.addJar(SparkContext.scala:1996)
at org.apache.spark.SparkContext.addJar(SparkContext.scala:1928)
at org.apache.spark.SparkContextSuite.$anonfun$new$115(SparkContextSuite.
scala:1041)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)

Regards
Pralabh Kumar


Log4j 1.2.17 spark CVE

2021-12-12 Thread Pralabh Kumar
Hi developers,  users

Spark is built using log4j 1.2.17 . Is there a plan to upgrade based on
recent CVE detected ?


Regards
Pralabh kumar


Spark Thriftserver is failing for when submitting command from beeline

2021-08-20 Thread Pralabh Kumar
Hi Dev

Environment details

Hadoop 3.2
Hive 3.1
Spark 3.0.3

Cluster : Kerborized .

1) Hive server is running fine
2) Spark sql , sparkshell, spark submit everything is working as expected.
3) Connecting Hive through beeline is working fine (after kinit)
beeline -u "jdbc:hive2://:/default;principal=

Now launched Spark thrift server and try to connect it through beeline.

beeline client perfectly connects with STS .

4) beeline -u "jdbc:hive2://:/default;principal=
   a) Log says connected to
   Spark sql
   Drive : Hive JDBC


Now when I run any commands ("show tables") it fails .  Log ins STS  says

*21/08/19 19:30:12 DEBUG UserGroupInformation: PrivilegedAction as:
(auth:PROXY) via  (auth:KERBEROS)
from:org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge$Client.createClientTransport(HadoopThriftAuthBridge.java:208)*
*21/08/19 19:30:12 DEBUG UserGroupInformation: PrivilegedAction as:*
**  * (auth:PROXY) via * **  * (auth:KERBEROS)
from:org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)*
21/08/19 19:30:12 DEBUG TSaslTransport: opening transport
org.apache.thrift.transport.TSaslClientTransport@f43fd2f
21/08/19 19:30:12 ERROR TSaslTransport: SASL negotiation failure
javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to
find any Kerberos tgt)]
at
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
at
org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:95)
at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271)
at
org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:38)
at
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
at
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:480)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:247)
at
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.(SessionHiveMetaStoreClient.java:70)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1707)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:83)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
at
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3600)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3652)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3632)
at org.apache.hadoop.hive.ql.metadata.Hive.getDatabase(Hive.java:1556)
at org.apache.hadoop.hive.ql.metadata.Hive.databaseExists(Hive.java:1545)
at
org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$databaseExists$1(HiveClientImpl.scala:384)




My guess is authorization through proxy is not working .



Please help


Regards
Pralabh Kumar


Re: Hive on Spark vs Spark on Hive(HiveContext)

2021-07-01 Thread Pralabh Kumar
Hi mich

Thx for replying.your answer really helps. The comparison was done in 2016.
I would like to know the latest comparison with spark 3.0

Also what you are suggesting is to migrate queries to Spark ,which is
hivecontxt or hive on spark, which is what Facebook also did
. Is that understanding correct ?

Regards
Pralabh

On Thu, 1 Jul 2021, 15:44 Mich Talebzadeh, 
wrote:

> Hi Prahabh,
>
> This question has been asked before :)
>
> Few years ago (late 2016),  I made a presentation on running Hive Queries
> on the Spark execution engine for Hortonworks.
>
>
> https://www.slideshare.net/MichTalebzadeh1/query-engines-for-hive-mr-spark-tez-with-llap-considerations
>
> The issue you will face will be compatibility problems with versions of
> Hive and Spark.
>
> My suggestion would be to use Spark as a massive parallel processing and
> Hive as a storage layer. However, you need to test what can be migrated or
> not.
>
> HTH
>
>
> Mich
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 1 Jul 2021 at 10:52, Pralabh Kumar  wrote:
>
>> Hi Dev
>>
>> I am having thousands of legacy hive queries .  As a plan to move to
>> Spark , we are planning to migrate Hive queries on Spark .  Now there are
>> two approaches
>>
>>
>>1.  One is Hive on Spark , which is similar to changing the execution
>>engine in hive queries like TEZ.
>>2. Another one is migrating hive queries to Hivecontext/sparksql , an
>>approach used by Facebook and presented in Spark conference.
>>
>> https://databricks.com/session/experiences-migrating-hive-workload-to-sparksql#:~:text=Spark%20SQL%20in%20Apache%20Spark,SQL%20with%20minimal%20user%20intervention
>>.
>>
>>
>> Can you please guide me which option to go for . I am personally inclined
>> to go for option 2 . It also allows the use of the latest spark .
>>
>> Please help me on the same , as there are not much comparisons online
>> available keeping Spark 3.0 in perspective.
>>
>> Regards
>> Pralabh Kumar
>>
>>
>>


Hive on Spark vs Spark on Hive(HiveContext)

2021-07-01 Thread Pralabh Kumar
Hi Dev

I am having thousands of legacy hive queries .  As a plan to move to Spark
, we are planning to migrate Hive queries on Spark .  Now there are two
approaches


   1.  One is Hive on Spark , which is similar to changing the execution
   engine in hive queries like TEZ.
   2. Another one is migrating hive queries to Hivecontext/sparksql , an
   approach used by Facebook and presented in Spark conference.
   
https://databricks.com/session/experiences-migrating-hive-workload-to-sparksql#:~:text=Spark%20SQL%20in%20Apache%20Spark,SQL%20with%20minimal%20user%20intervention
   .


Can you please guide me which option to go for . I am personally inclined
to go for option 2 . It also allows the use of the latest spark .

Please help me on the same , as there are not much comparisons online
available keeping Spark 3.0 in perspective.

Regards
Pralabh Kumar


Unable to pickle pySpark PipelineModel

2020-12-10 Thread Pralabh Kumar
Hi Dev , User

I want to store spark ml model in databases , so that I can reuse them
later on .  I am
unable to pickle them . However while using scala I am able to convert them
into byte
array stream .

So for .eg I am able to do something below in scala but not in python

 val modelToByteArray = new ByteArrayOutputStream()
 val oos = new ObjectOutputStream(modelToByteArray)
 oos.writeObject(model)
 oos.close()
 oos.flush()

spark.sparkContext.parallelize(Seq((model.uid, "my-neural-network-model",
modelToByteArray.toByteArray)))
   .saveToCassandra("dfsdfs", "models", SomeColumns("uid", "name", "model")


But pickle.dumps(model) in pyspark throws error

cannot pickle '_thread.RLock' object


Please help on the same


Regards

Pralabh


Re: org.apache.spark.shuffle.FetchFailedException: Too large frame:

2018-05-02 Thread Pralabh Kumar
I am performing join operation , if I convert reduce side join to map side
(no shuffle will happen)  and I assume in that case this error shouldn't
come. Let me know if this understanding is correct

On Tue, May 1, 2018 at 9:37 PM, Ryan Blue <rb...@netflix.com> wrote:

> This is usually caused by skew. Sometimes you can work around it by in
> creasing the number of partitions like you tried, but when that doesn’t
> work you need to change the partitioning that you’re using.
>
> If you’re aggregating, try adding an intermediate aggregation. For
> example, if your query is select sum(x), a from t group by a, then try select
> sum(partial), a from (select sum(x) as partial, a, b from t group by a, b)
> group by a.
>
> rb
> ​
>
> On Tue, May 1, 2018 at 4:21 AM, Pralabh Kumar <pralabhku...@gmail.com>
> wrote:
>
>> Hi
>>
>> I am getting the above error in Spark SQL . I have increase (using 5000 )
>> number of partitions but still getting the same error .
>>
>> My data most probably is skew.
>>
>>
>>
>> org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419)
>>  at 
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349)
>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


org.apache.spark.shuffle.FetchFailedException: Too large frame:

2018-05-01 Thread Pralabh Kumar
Hi

I am getting the above error in Spark SQL . I have increase (using 5000 )
number of partitions but still getting the same error .

My data most probably is skew.



org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349)


Best way to Hive to Spark migration

2018-04-04 Thread Pralabh Kumar
Hi Spark group

What's the best way to Migrate Hive to Spark

1) Use HiveContext of Spark
2) Use Hive on Spark (
https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started
)
3) Migrate Hive to Calcite to Spark SQL


Regards


Re: Are there any alternatives to Hive "stored by" clause as Spark 2.0 does not support it

2018-02-08 Thread Pralabh Kumar
Hi Jacek

https://cwiki.apache.org/confluence/display/Hive/StorageHandlers

The motivation is to make it possible to allow Hive to access data stored
and managed by other systems in a modular, extensible fashion.


I have hive script which have custom storage handler , something like this


create table


   1. CREATE EXTERNAL TABLE $temp_output
   2. (
   3. data String
   4. )
   5. STORED BY 'ABCStorageHandler' LOCATION '$table_location'
   TBLPROPERTIES (
   6.
   7. );


when I migrate to Spark it says STORED BY operation is not permitted.

Regards
Pralabh Kumar

On Thu, Feb 8, 2018 at 6:28 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> Since I'm new to Hive, what does `stored by` do? I might help a bit in
> Spark if I only knew a bit about Hive :)
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>
> On Thu, Feb 8, 2018 at 7:25 AM, Pralabh Kumar <pralabhku...@gmail.com>
> wrote:
>
>> Hi
>>
>> Spark 2.0 doesn't support stored by . Is there any alternative to achieve
>> the same.
>>
>>
>>
>


Are there any alternatives to Hive "stored by" clause as Spark 2.0 does not support it

2018-02-07 Thread Pralabh Kumar
Hi

Spark 2.0 doesn't support stored by . Is there any alternative to achieve
the same.


Re: Kryo serialization failed: Buffer overflow : Broadcast Join

2018-02-02 Thread Pralabh Kumar
I am using spark 2.1.0

On Fri, Feb 2, 2018 at 5:08 PM, Pralabh Kumar <pralabhku...@gmail.com>
wrote:

> Hi
>
> I am performing broadcast join where my small table is 1 gb .  I am
> getting following error .
>
> I am using
>
>
> org.apache.spark.SparkException:
> . Available: 0, required: 28869232. To avoid this, increase
> spark.kryoserializer.buffer.max value
>
>
>
> I increase the value to
>
> spark.conf.set("spark.kryoserializer.buffer.max","2g")
>
>
> But I am still getting the error .
>
> Please help
>
> Thx
>


Kryo serialization failed: Buffer overflow : Broadcast Join

2018-02-02 Thread Pralabh Kumar
Hi

I am performing broadcast join where my small table is 1 gb .  I am getting
following error .

I am using


org.apache.spark.SparkException:
. Available: 0, required: 28869232. To avoid this, increase
spark.kryoserializer.buffer.max value



I increase the value to

spark.conf.set("spark.kryoserializer.buffer.max","2g")


But I am still getting the error .

Please help

Thx


Does Spark and Hive use Same SQL parser : ANTLR

2018-01-18 Thread Pralabh Kumar
Hi


Does hive and spark uses same SQL parser provided by ANTLR . Did they
generate the same logical plan .

Please help on the same.


Regards
Pralabh Kumar


PIG to Spark

2018-01-08 Thread Pralabh Kumar
Hi

Is there a convenient way /open source project to convert PIG scripts to
Spark.


Regards
Pralabh Kumar


Re: Spark GroupBy Save to different files

2017-09-04 Thread Pralabh Kumar
Hi arun

rdd1.groupBy(_.city).map(s=>(s._1,s._2.toList.toString())).toDF("city","data").write.
*partitionBy("city")*.csv("/data")

should work for you .

Regards
Pralabh

On Sat, Sep 2, 2017 at 7:58 AM, Ryan  wrote:

> you may try foreachPartition
>
> On Fri, Sep 1, 2017 at 10:54 PM, asethia  wrote:
>
>> Hi,
>>
>> I have list of person records in following format:
>>
>> case class Person(fName:String, city:String)
>>
>> val l=List(Person("A","City1"),Person("B","City2"),Person("C","City1"))
>>
>> val rdd:RDD[Person]=sc.parallelize(l)
>>
>> val groupBy:RDD[(String, Iterable[Person])]=rdd.groupBy(_.city)
>>
>> I would like to save these group by records in different files (for
>> example
>> by city). Please can some one help me here.
>>
>> I tried this but not able to create those files
>>
>>  groupBy.foreach(x=>{
>> x._2.toList.toDF().rdd.saveAsObjectFile(s"file:///tmp/files/${x._1}")
>>   })
>>
>> Thanks
>> Arun
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: GC overhead exceeded

2017-08-17 Thread Pralabh Kumar
what's is your exector memory , please share the code also

On Fri, Aug 18, 2017 at 10:06 AM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

>
> HI,
>
> I am getting below error when running spark sql jobs. This error is thrown
> after running 80% of tasks. any solution?
>
> spark.storage.memoryFraction=0.4
> spark.sql.shuffle.partitions=2000
> spark.default.parallelism=100
> #spark.eventLog.enabled=false
> #spark.scheduler.revive.interval=1s
> spark.driver.memory=8g
>
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at java.util.ArrayList.subList(ArrayList.java:955)
> at java.lang.String.split(String.java:2311)
> at sun.net.util.IPAddressUtil.textToNumericFormatV4(
> IPAddressUtil.java:47)
> at java.net.InetAddress.getAllByName(InetAddress.java:1129)
> at java.net.InetAddress.getAllByName(InetAddress.java:1098)
> at java.net.InetAddress.getByName(InetAddress.java:1048)
> at org.apache.hadoop.net.NetUtils.normalizeHostName(
> NetUtils.java:562)
> at org.apache.hadoop.net.NetUtils.normalizeHostNames(
> NetUtils.java:579)
> at org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(
> CachedDNSToSwitchMapping.java:109)
> at org.apache.hadoop.yarn.util.RackResolver.coreResolve(
> RackResolver.java:101)
> at org.apache.hadoop.yarn.util.RackResolver.resolve(
> RackResolver.java:81)
> at org.apache.spark.scheduler.cluster.YarnScheduler.
> getRackForHost(YarnScheduler.scala:37)
> at org.apache.spark.scheduler.TaskSetManager.dequeueTask(
> TaskSetManager.scala:380)
> at org.apache.spark.scheduler.TaskSetManager.resourceOffer(
> TaskSetManager.scala:433)
> at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$
> org$apache$spark$scheduler$TaskSchedulerImpl$$
> resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:276)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.
> scala:160)
> at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$
> spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(
> TaskSchedulerImpl.scala:271)
> at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$
> resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:357)
> at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$
> resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:355)
> at scala.collection.IndexedSeqOptimized$class.
> foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(
> ArrayOps.scala:186)
> at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$
> resourceOffers$4.apply(TaskSchedulerImpl.scala:355)
> at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$
> resourceOffers$4.apply(TaskSchedulerImpl.scala:352)
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(
> ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(
> TaskSchedulerImpl.scala:352)
> at org.apache.spark.scheduler.cluster.
> CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$
> spark$scheduler$cluster$CoarseGrainedSchedulerBackend$
> DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:222)
>
>


Re: Reading Hive tables Parallel in Spark

2017-07-17 Thread Pralabh Kumar
Run the spark context in multithreaded way .

Something like this

val spark =  SparkSession.builder()
  .appName("practice")
  .config("spark.scheduler.mode","FAIR")
  .enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
val hc = spark.sqlContext


val thread1 = new Thread {
 override def run {
   hc.sql("select * from table1")
 }
   }

   val thread2 = new Thread {
 override def run {
   hc.sql("select * from table2")
 }
   }

   thread1.start()
   thread2.start()



On Mon, Jul 17, 2017 at 5:42 PM, FN  wrote:

> Hi
> I am currently trying to parallelize reading multiple tables from Hive . As
> part of an archival framework, i need to convert few hundred tables which
> are in txt format to Parquet. For now i am calling a Spark SQL inside a for
> loop for conversion. But this execute sequential and entire process takes
> longer time to finish.
>
> I tired  submitting 4 different Spark jobs ( each having set of tables to
> be
> converted), it did give me some parallelism , but i would like to do this
> in
> single Spark job due to few limitation of our cluster and process
>
> Any help will be greatly appreciated
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Reading-Hive-tables-Parallel-in-Spark-tp28869.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: splitting columns into new columns

2017-07-17 Thread Pralabh Kumar
Hi Nayan

Please find the solution of your problem which work on spark 2.

val spark =
SparkSession.builder().appName("practice").enableHiveSupport().getOrCreate()
  val sc = spark.sparkContext
  val sqlContext = spark.sqlContext
  import spark.implicits._
  val dataFrame =
sc.parallelize(List("ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5"))
  .map(s=>s.split("\\|")).map(s=>(s(0),s(1)))
.toDF("phone","contact")
  dataFrame.show()
  val newDataSet= dataFrame.rdd.map(data=>{
val  t1 =  ArrayBuffer[String] ()
for (i <- 0.to(1)) {
  val col = data.get(i).asInstanceOf[String]
  val dd= col.split("\\^").toSeq
  for(col<-dd){
t1 +=(col)
  }
}
Row.fromSeq(t1.seq)
  })

  val firtRow = dataFrame.select("*").take(1)(0)
  dataFrame.schema.fieldNames
  var schema =""

  for ((colNames,idx) <- dataFrame.schema.fieldNames.zipWithIndex.view) {
val data = firtRow(idx).asInstanceOf[String].split("\\^")
var j = 0
for(d<-data){
  schema = schema + colNames + j + ","
  j = j+1
}
  }
  schema=schema.substring(0,schema.length-1)
  val sqlSchema =
StructType(schema.split(",").map(s=>StructField(s,StringType,false)))
  sqlContext.createDataFrame(newDataSet,sqlSchema).show()

Regards
Pralabh Kumar


On Mon, Jul 17, 2017 at 1:55 PM, nayan sharma <nayansharm...@gmail.com>
wrote:

> If I have 2-3 values in a column then I can easily separate it and create
> new columns with withColumn option.
> but I am trying to achieve it in loop and dynamically generate the new
> columns as many times the ^ has occurred in column values
>
> Can it be achieve in this way.
>
> On 17-Jul-2017, at 3:29 AM, ayan guha <guha.a...@gmail.com> wrote:
>
> You are looking for explode function.
>
> On Mon, 17 Jul 2017 at 4:25 am, nayan sharma <nayansharm...@gmail.com>
> wrote:
>
>> I’ve a Dataframe where in some columns there are multiple values, always
>> separated by ^
>>
>> phone|contact|
>> ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5|
>>
>> phone1|phone2|contact1|contact2|
>> ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5|
>>
>> How can this be achieved using loop as the separator between column values
>> are not constant.
>> data.withColumn("phone",split($"phone","\\^")).select($"
>> phon‌​e".getItem(0).as("ph‌​one1"),$"phone".getI‌​tem(1).as("phone2”))
>>  I though of doing this way but the problem is  column are having 100+
>> separator between the column values
>>
>>
>>
>> Thank you,
>> Nayan
>>
> --
> Best Regards,
> Ayan Guha
>
>
>


Re: Withcolumn date with sysdate

2017-06-30 Thread Pralabh Kumar
put default value inside lit

df.withcolumn("date",lit("constant value"))

On Fri, Jun 30, 2017 at 10:20 PM, sudhir k  wrote:

> Can we add a column to dataframe with a default value like sysdate .. I am
> calling my udf but it is throwing error col expected .
>
> On spark shell
> df.withcolumn("date",curent_date) works I need similiar for scala program
> which I can build in a jar
>
>
> Thanks,
> Sudhir
> --
> Sent from Gmail Mobile
>


Re: (Spark-ml) java.util.NosuchElementException: key not found exception on doing prediction and computing test error.

2017-06-28 Thread Pralabh Kumar
Hi Neha

This generally occurred when , you training data set have some value of
categorical variable ,which in not there in your testing data. For e.g you
have column DAYS ,with value M,T,W in training data . But when your test
data contains F ,then it say no key found exception .  Please look into
this  , and if that's not the case ,then Could you please share your code
,and training/testing data for better understanding.

Regards
Pralabh Kumar

On Wed, Jun 28, 2017 at 11:45 AM, neha nihal <nehaniha...@gmail.com> wrote:

>
> Hi,
>
> I am using Apache spark 2.0.2 randomforest ml (standalone mode) for text
> classification. TF-IDF feature extractor is also used. The training part
> runs without any issues and returns 100% accuracy. But when I am trying to
> do prediction using trained model and compute test error, it fails with
> java.util.NosuchElementException: key not found exception.
> Any help will be much appreciated.
>
> Thanks
>
>


Re: Question about Parallel Stages in Spark

2017-06-26 Thread Pralabh Kumar
i think my words also misunderstood. My point is they will not submit
together since they are the part of one thread.

val spark =  SparkSession.builder()
  .appName("practice")
  .config("spark.scheduler.mode","FAIR")
  .enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
sc.parallelize(List(1.to(1000))).map(s=>Thread.sleep(1)).collect()
sc.parallelize(List(1.to(1000))).map(s=>Thread.sleep(1)).collect()
Thread.sleep(1000)


I ran this and both spark submit time are different for both the jobs .

Please let me if I am wrong

On Tue, Jun 27, 2017 at 9:17 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> My words cause misunderstanding.
> Step 1:A is submited to spark.
> Step 2:B is submitted to spark.
>
> Spark gets two independent jobs.The FAIR  is used to schedule A and B.
>
> Jeffrey' code did not cause two submit.
>
>
>
> ---Original---
> *From:* "Pralabh Kumar"<pralabhku...@gmail.com>
> *Date:* 2017/6/27 12:09:27
> *To:* "萝卜丝炒饭"<1427357...@qq.com>;
> *Cc:* "user"<user@spark.apache.org>;"satishl"<satish.la...@gmail.com>;"Bryan
> Jeffrey"<bryan.jeff...@gmail.com>;
> *Subject:* Re: Question about Parallel Stages in Spark
>
> Hi
>
> I don't think so spark submit ,will receive two submits .  Its will
> execute one submit and then to next one .  If the application is
> multithreaded ,and two threads are calling spark submit and one time , then
> they will run parallel provided the scheduler is FAIR and task slots are
> available .
>
> But in one thread ,one submit will complete and then the another one will
> start . If there are independent stages in one job, then those will run
> parallel.
>
> I agree with Bryan Jeffrey .
>
>
> Regards
> Pralabh Kumar
>
> On Tue, Jun 27, 2017 at 9:03 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:
>
>> I think the spark cluster receives two submits, A and B.
>> The FAIR  is used to schedule A and B.
>> I am not sure about this.
>>
>> ---Original---
>> *From:* "Bryan Jeffrey"<bryan.jeff...@gmail.com>
>> *Date:* 2017/6/27 08:55:42
>> *To:* "satishl"<satish.la...@gmail.com>;
>> *Cc:* "user"<user@spark.apache.org>;
>> *Subject:* Re: Question about Parallel Stages in Spark
>>
>> Hello.
>>
>> The driver is running the individual operations in series, but each
>> operation is parallelized internally.  If you want them run in parallel you
>> need to provide the driver a mechanism to thread the job scheduling out:
>>
>> val rdd1 = sc.parallelize(1 to 10)
>> val rdd2 = sc.parallelize(1 to 20)
>>
>> var thingsToDo: ParArray[(RDD[Int], Int)] = Array(rdd1, 
>> rdd2).zipWithIndex.par
>>
>> thingsToDo.foreach { case(rdd, index) =>
>>   for(i <- (1 to 1))
>> logger.info(s"Index ${index} - ${rdd.sum()}")
>> }
>>
>>
>> This will run both operations in parallel.
>>
>>
>> On Mon, Jun 26, 2017 at 8:10 PM, satishl <satish.la...@gmail.com> wrote:
>>
>>> For the below code, since rdd1 and rdd2 dont depend on each other - i was
>>> expecting that both first and second printlns would be interwoven.
>>> However -
>>> the spark job runs all "first " statements first and then all "seocnd"
>>> statements next in serial fashion. I have set spark.scheduler.mode =
>>> FAIR.
>>> obviously my understanding of parallel stages is wrong. What am I
>>> missing?
>>>
>>> val rdd1 = sc.parallelize(1 to 100)
>>> val rdd2 = sc.parallelize(1 to 100)
>>>
>>> for (i <- (1 to 100))
>>>   println("first: " + rdd1.sum())
>>> for (i <- (1 to 100))
>>>   println("second" + rdd2.sum())
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Question-about-Parallel-Stages-in-Spar
>>> k-tp28793.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Question about Parallel Stages in Spark

2017-06-26 Thread Pralabh Kumar
Hi

I don't think so spark submit ,will receive two submits .  Its will execute
one submit and then to next one .  If the application is multithreaded ,and
two threads are calling spark submit and one time , then they will run
parallel provided the scheduler is FAIR and task slots are available .

But in one thread ,one submit will complete and then the another one will
start . If there are independent stages in one job, then those will run
parallel.

I agree with Bryan Jeffrey .


Regards
Pralabh Kumar

On Tue, Jun 27, 2017 at 9:03 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

> I think the spark cluster receives two submits, A and B.
> The FAIR  is used to schedule A and B.
> I am not sure about this.
>
> ---Original---
> *From:* "Bryan Jeffrey"<bryan.jeff...@gmail.com>
> *Date:* 2017/6/27 08:55:42
> *To:* "satishl"<satish.la...@gmail.com>;
> *Cc:* "user"<user@spark.apache.org>;
> *Subject:* Re: Question about Parallel Stages in Spark
>
> Hello.
>
> The driver is running the individual operations in series, but each
> operation is parallelized internally.  If you want them run in parallel you
> need to provide the driver a mechanism to thread the job scheduling out:
>
> val rdd1 = sc.parallelize(1 to 10)
> val rdd2 = sc.parallelize(1 to 20)
>
> var thingsToDo: ParArray[(RDD[Int], Int)] = Array(rdd1, rdd2).zipWithIndex.par
>
> thingsToDo.foreach { case(rdd, index) =>
>   for(i <- (1 to 1))
> logger.info(s"Index ${index} - ${rdd.sum()}")
> }
>
>
> This will run both operations in parallel.
>
>
> On Mon, Jun 26, 2017 at 8:10 PM, satishl <satish.la...@gmail.com> wrote:
>
>> For the below code, since rdd1 and rdd2 dont depend on each other - i was
>> expecting that both first and second printlns would be interwoven.
>> However -
>> the spark job runs all "first " statements first and then all "seocnd"
>> statements next in serial fashion. I have set spark.scheduler.mode = FAIR.
>> obviously my understanding of parallel stages is wrong. What am I missing?
>>
>> val rdd1 = sc.parallelize(1 to 100)
>> val rdd2 = sc.parallelize(1 to 100)
>>
>> for (i <- (1 to 100))
>>   println("first: " + rdd1.sum())
>> for (i <- (1 to 100))
>>   println("second" + rdd2.sum())
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Question-about-Parallel-Stages-in-
>> Spark-tp28793.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Re: spark2.1 kafka0.10

2017-06-22 Thread Pralabh Kumar
It looks like your replicas for partition are getting failed. If u have
more brokers , can u try increasing ,replicas ,just to make sure atleast
one leader is always available.

On Thu, Jun 22, 2017 at 10:34 AM, lk_spark <lk_sp...@163.com> wrote:

> each topic have 5 partition  ,  2 replicas .
>
> 2017-06-22
> --
> lk_spark
> ------
>
> *发件人:*Pralabh Kumar <pralabhku...@gmail.com>
> *发送时间:*2017-06-22 17:23
> *主题:*Re: spark2.1 kafka0.10
> *收件人:*"lk_spark"<lk_sp...@163.com>
> *抄送:*"user.spark"<user@spark.apache.org>
>
> How many replicas ,you have for this topic .
>
> On Thu, Jun 22, 2017 at 9:19 AM, lk_spark <lk_sp...@163.com> wrote:
>
>> java.lang.IllegalStateException: No current assignment for partition
>> pages-2
>>  at org.apache.kafka.clients.consumer.internals.SubscriptionStat
>> e.assignedState(SubscriptionState.java:264)
>>  at org.apache.kafka.clients.consumer.internals.SubscriptionStat
>> e.needOffsetReset(SubscriptionState.java:336)
>>  at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(
>> KafkaConsumer.java:1236)
>>  at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
>> latestOffsets(DirectKafkaInputDStream.scala:197)
>>  at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
>> compute(DirectKafkaInputDStream.scala:214)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1$$anonfun$1.apply(DStream.scala:340)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1$$anonfun$1.apply(DStream.scala:340)
>>  at org.apache.spark.streaming.dstream.DStream.createRDDWithLoca
>> lProperties(DStream.scala:415)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1.apply(DStream.scala:335)
>>  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCom
>> pute$1.apply(DStream.scala:333)
>>  at scala.Option.orElse(Option.scala:289)
>>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStr
>> eam.scala:330)
>>  at org.apache.spark.streaming.dstream.ForEachDStream.generateJo
>> b(ForEachDStream.scala:48)
>>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DSt
>> reamGraph.scala:117)
>>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DSt
>> reamGraph.scala:116)
>>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:241)
>>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:241)
>>  at scala.collection.mutable.ResizableArray$class.foreach(Resiza
>> bleArray.scala:59)
>>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>  at scala.collection.TraversableLike$class.flatMap(TraversableLi
>> ke.scala:241)
>>  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>>  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStream
>> Graph.scala:116)
>>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$
>> 3.apply(JobGenerator.scala:249)
>>  at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$
>> 3.apply(JobGenerator.scala:247)
>>  at scala.util.Try$.apply(Try.scala:192)
>>  at org.apache.spark.streaming.scheduler.JobGenerator.generateJo
>> bs(JobGenerator.scala:247)
>>  at org.apache.spark.streaming.scheduler.JobGenerator.org$apache
>> $spark$streaming$scheduler$JobGenerator$$processEvent(
>> JobGenerator.scala:183)
>>  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.
>> onReceive(JobGenerator.scala:89)
>>  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.
>> onReceive(JobGenerator.scala:88)
>>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> 2017-06-22
>> --
>> lk_spark
>> --
>>
>> *发件人:*"lk_spark"<lk_sp...@163.com>
>> *发送时间:*2017-06-22 11:13
>> *主题:*spark2.1 kafka0.10
>> *收件人:*"user.spark"<user@spark.apache.org>
>> *抄送:*
>>
>> hi,all:
>> when I run stream application for a few minutes ,I got this error :
>>
>> 17/06/22 10:34:56 INFO ConsumerCoordinator: Rev

Re: Broadcasts & Storage Memory

2017-06-22 Thread Pralabh Kumar
Hi

Broadcast variables definitely store in the spark.memory.storageFraction .

1 If we go into the code of TorrentBroadcast.scala and writeBlocks method
and navigates to BlockManager to MemoryStore . Desearlization of the
variables occures in unroll memory and then transferred to storage memory .

memoryManager.synchronized {
  releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)

val success = memoryManager.acquireStorageMemory(blockId, amount,
MemoryMode.ON_HEAP)


So definitely broadcast variables are stored in  spark.memory.storageFraction
.


Can u explain how are u seeing smaller set of memory used on given executor
for broadcast variables through UI ?

Regards
Pralabh Kumar

On Thu, Jun 22, 2017 at 4:39 AM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> Satish,
>
> I agree - that was my impression too. However I am seeing a smaller set of
> storage memory used on a given executor than the amount of memory required
> for my broadcast variables. I am wondering if the statistics in the ui are
> incorrect or if the broadcasts are simply not a part of that storage memory
> fraction.
>
> Bryan Jeffrey
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
>
>
>
> On Wed, Jun 21, 2017 at 6:48 PM -0400, "satish lalam" <
> satish.la...@gmail.com> wrote:
>
> My understanding is - it from storageFraction. Here cached blocks are
>> immune to eviction - so both persisted RDDs and broadcast variables sit
>> here. Ref
>> <https://image.slidesharecdn.com/sparkinternalsworkshoplatest-160303190243/95/apache-spark-in-depth-core-concepts-architecture-internals-20-638.jpg?cb=1457597704>
>>
>>
>> On Wed, Jun 21, 2017 at 1:43 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
>> wrote:
>>
>>> Hello.
>>>
>>> Question: Do broadcast variables stored on executors count as part of
>>> 'storage memory' or other memory?
>>>
>>> A little bit more detail:
>>>
>>> I understand that we have two knobs to control memory allocation:
>>> - spark.memory.fraction
>>> - spark.memory.storageFraction
>>>
>>> My understanding is that spark.memory.storageFraction controls the
>>> amount of memory allocated for cached RDDs.  spark.memory.fraction controls
>>> how much memory is allocated to Spark operations (task serialization,
>>> operations, etc.), w/ the remainder reserved for user data structures,
>>> Spark internal metadata, etc.  This includes the storage memory for cached
>>> RDDs.
>>>
>>> You end up with executor memory that looks like the following:
>>> All memory: 0-100
>>> Spark memory: 0-75
>>> RDD Storage: 0-37
>>> Other Spark: 38-75
>>> Other Reserved: 76-100
>>>
>>> Where do broadcast variables fall into the mix?
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>
>>


Re: spark2.1 kafka0.10

2017-06-21 Thread Pralabh Kumar
How many replicas ,you have for this topic .

On Thu, Jun 22, 2017 at 9:19 AM, lk_spark  wrote:

> java.lang.IllegalStateException: No current assignment for partition
> pages-2
>  at org.apache.kafka.clients.consumer.internals.SubscriptionState.
> assignedState(SubscriptionState.java:264)
>  at org.apache.kafka.clients.consumer.internals.SubscriptionState.
> needOffsetReset(SubscriptionState.java:336)
>  at org.apache.kafka.clients.consumer.KafkaConsumer.
> seekToEnd(KafkaConsumer.java:1236)
>  at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
> latestOffsets(DirectKafkaInputDStream.scala:197)
>  at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(
> DirectKafkaInputDStream.scala:214)
>  at org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>  at org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>  at org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>  at org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>  at org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:415)
>  at org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
>  at org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
>  at scala.Option.orElse(Option.scala:289)
>  at org.apache.spark.streaming.dstream.DStream.getOrCompute(
> DStream.scala:330)
>  at org.apache.spark.streaming.dstream.ForEachDStream.
> generateJob(ForEachDStream.scala:48)
>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(
> DStreamGraph.scala:117)
>  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(
> DStreamGraph.scala:116)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:241)
>  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:241)
>  at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at scala.collection.TraversableLike$class.flatMap(
> TraversableLike.scala:241)
>  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>  at org.apache.spark.streaming.DStreamGraph.generateJobs(
> DStreamGraph.scala:116)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$
> anonfun$3.apply(JobGenerator.scala:249)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$
> anonfun$3.apply(JobGenerator.scala:247)
>  at scala.util.Try$.apply(Try.scala:192)
>  at org.apache.spark.streaming.scheduler.JobGenerator.
> generateJobs(JobGenerator.scala:247)
>  at org.apache.spark.streaming.scheduler.JobGenerator.org$
> apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.
> scala:183)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:89)
>  at org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:88)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> 2017-06-22
> --
> lk_spark
> --
>
> *发件人:*"lk_spark"
> *发送时间:*2017-06-22 11:13
> *主题:*spark2.1 kafka0.10
> *收件人:*"user.spark"
> *抄送:*
>
> hi,all:
> when I run stream application for a few minutes ,I got this error :
>
> 17/06/22 10:34:56 INFO ConsumerCoordinator: Revoking previously assigned
> partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1,
> weibocomment-0, bizs-2, pages-0, bizs-4, pages-2, weibo-0, pages-4,
> weibo-4, clicks-1, comment-1, weibo-2, clicks-3, weibocomment-4,
> weibocomment-2, profile-0, profile-2, profile-4, cwb-4, cwb-2, cwb-0,
> bizs-0, bizs-3, pages-1, weibo-1, pages-3, clicks-2, weibo-3, clicks-4,
> comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1
> 17/06/22 10:34:56 INFO AbstractCoordinator: (Re-)joining group youedata1
> 17/06/22 10:34:56 INFO AbstractCoordinator: Successfully joined group
> youedata1 with generation 3
> 17/06/22 10:34:56 INFO ConsumerCoordinator: Setting newly assigned
> partitions [comment-0, profile-1, profile-3, cwb-3, bizs-1, cwb-1,
> weibocomment-0, bizs-2, bizs-4, pages-4, weibo-4, clicks-1, comment-1,
> clicks-3, weibocomment-4, weibocomment-2, profile-0, profile-2, profile-4,
> cwb-4, cwb-2, cwb-0, bizs-0, bizs-3, pages-3, clicks-2, weibo-3, clicks-4,
> comment-2, weibocomment-3, clicks-0, weibocomment-1] for group youedata1
> 17/06/22 10:34:56 ERROR JobScheduler: Error generating jobs for time
> 1498098896000 ms
> java.lang.IllegalStateException: No current assignment for partition
> pages-2
>
> I don't know why ?
>
> 

Re: Best alternative for Category Type in Spark Dataframe

2017-06-17 Thread Pralabh Kumar
make sense :)

On Sun, Jun 18, 2017 at 8:38 AM, 颜发才(Yan Facai) <facai@gmail.com> wrote:

> Yes, perhaps we could use SQLTransformer as well.
>
> http://spark.apache.org/docs/latest/ml-features.html#sqltransformer
>
> On Sun, Jun 18, 2017 at 10:47 AM, Pralabh Kumar <pralabhku...@gmail.com>
> wrote:
>
>> Hi Yan
>>
>> Yes sql is good option , but if we have to create ML Pipeline , then
>> having transformers and set it into pipeline stages ,would be better option
>> .
>>
>> Regards
>> Pralabh Kumar
>>
>> On Sun, Jun 18, 2017 at 4:23 AM, 颜发才(Yan Facai) <facai@gmail.com>
>> wrote:
>>
>>> To filter data, how about using sql?
>>>
>>> df.createOrReplaceTempView("df")
>>> val sqlDF = spark.sql("SELECT * FROM df WHERE EMOTION IN 
>>> (HAPPY,SAD,ANGRY,NEUTRAL,NA)")
>>>
>>> https://spark.apache.org/docs/latest/sql-programming-guide.html#sql
>>>
>>>
>>>
>>> On Fri, Jun 16, 2017 at 11:28 PM, Pralabh Kumar <pralabhku...@gmail.com>
>>> wrote:
>>>
>>>> Hi Saatvik
>>>>
>>>> You can write your own transformer to make sure that column contains
>>>> ,value which u provided , and filter out rows which doesn't follow the
>>>> same.
>>>>
>>>> Something like this
>>>>
>>>>
>>>> case class CategoryTransformer(override val uid : String) extends
>>>> Transformer{
>>>>   override def transform(inputData: DataFrame): DataFrame = {
>>>> inputData.select("col1").filter("col1 in ('happy')")
>>>>   }
>>>>   override def copy(extra: ParamMap): Transformer = ???
>>>>   @DeveloperApi
>>>>   override def transformSchema(schema: StructType): StructType ={
>>>>schema
>>>>   }
>>>> }
>>>>
>>>>
>>>> Usage
>>>>
>>>> val data = sc.parallelize(List("abce","happy")).toDF("col1")
>>>> val trans = new CategoryTransformer("1")
>>>> data.show()
>>>> trans.transform(data).show()
>>>>
>>>>
>>>> This transformer will make sure , you always have values in col1 as
>>>> provided by you.
>>>>
>>>>
>>>> Regards
>>>> Pralabh Kumar
>>>>
>>>> On Fri, Jun 16, 2017 at 8:10 PM, Saatvik Shah <
>>>> saatvikshah1...@gmail.com> wrote:
>>>>
>>>>> Hi Pralabh,
>>>>>
>>>>> I want the ability to create a column such that its values be
>>>>> restricted to a specific set of predefined values.
>>>>> For example, suppose I have a column called EMOTION: I want to ensure
>>>>> each row value is one of HAPPY,SAD,ANGRY,NEUTRAL,NA.
>>>>>
>>>>> Thanks and Regards,
>>>>> Saatvik Shah
>>>>>
>>>>>
>>>>> On Fri, Jun 16, 2017 at 10:30 AM, Pralabh Kumar <
>>>>> pralabhku...@gmail.com> wrote:
>>>>>
>>>>>> Hi satvik
>>>>>>
>>>>>> Can u please provide an example of what exactly you want.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 16-Jun-2017 7:40 PM, "Saatvik Shah" <saatvikshah1...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Yan,
>>>>>>>
>>>>>>> Basically the reason I was looking for the categorical datatype is
>>>>>>> as given here
>>>>>>> <https://pandas.pydata.org/pandas-docs/stable/categorical.html>:
>>>>>>> ability to fix column values to specific categories. Is it possible to
>>>>>>> create a user defined data type which could do so?
>>>>>>>
>>>>>>> Thanks and Regards,
>>>>>>> Saatvik Shah
>>>>>>>
>>>>>>> On Fri, Jun 16, 2017 at 1:42 AM, 颜发才(Yan Facai) <facai@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> You can use some Transformers to handle categorical data,
>>>>>>>> For example,
>>>>>>>> StringIndexer encodes a string column of labels to a column of
>>>>>>>> label indices:
>>>>>>>> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 15, 2017 at 10:19 PM, saatvikshah1994 <
>>>>>>>> saatvikshah1...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> I'm trying to convert a Pandas -> Spark dataframe. One of the
>>>>>>>>> columns I have
>>>>>>>>> is of the Category type in Pandas. But there does not seem to be
>>>>>>>>> support for
>>>>>>>>> this same type in Spark. What is the best alternative?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>>>>> 1001560.n3.nabble.com/Best-alternative-for-Category-Type-in-
>>>>>>>>> Spark-Dataframe-tp28764.html
>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>> Nabble.com.
>>>>>>>>>
>>>>>>>>> 
>>>>>>>>> -
>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> *Saatvik Shah,*
>>>>>>> *1st  Year,*
>>>>>>> *Masters in the School of Computer Science,*
>>>>>>> *Carnegie Mellon University*
>>>>>>>
>>>>>>> *https://saatvikshah1994.github.io/
>>>>>>> <https://saatvikshah1994.github.io/>*
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> *Saatvik Shah,*
>>>>> *1st  Year,*
>>>>> *Masters in the School of Computer Science,*
>>>>> *Carnegie Mellon University*
>>>>>
>>>>> *https://saatvikshah1994.github.io/
>>>>> <https://saatvikshah1994.github.io/>*
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Best alternative for Category Type in Spark Dataframe

2017-06-17 Thread Pralabh Kumar
Hi Yan

Yes sql is good option , but if we have to create ML Pipeline , then having
transformers and set it into pipeline stages ,would be better option .

Regards
Pralabh Kumar

On Sun, Jun 18, 2017 at 4:23 AM, 颜发才(Yan Facai) <facai@gmail.com> wrote:

> To filter data, how about using sql?
>
> df.createOrReplaceTempView("df")
> val sqlDF = spark.sql("SELECT * FROM df WHERE EMOTION IN 
> (HAPPY,SAD,ANGRY,NEUTRAL,NA)")
>
> https://spark.apache.org/docs/latest/sql-programming-guide.html#sql
>
>
>
> On Fri, Jun 16, 2017 at 11:28 PM, Pralabh Kumar <pralabhku...@gmail.com>
> wrote:
>
>> Hi Saatvik
>>
>> You can write your own transformer to make sure that column contains
>> ,value which u provided , and filter out rows which doesn't follow the
>> same.
>>
>> Something like this
>>
>>
>> case class CategoryTransformer(override val uid : String) extends
>> Transformer{
>>   override def transform(inputData: DataFrame): DataFrame = {
>> inputData.select("col1").filter("col1 in ('happy')")
>>   }
>>   override def copy(extra: ParamMap): Transformer = ???
>>   @DeveloperApi
>>   override def transformSchema(schema: StructType): StructType ={
>>schema
>>   }
>> }
>>
>>
>> Usage
>>
>> val data = sc.parallelize(List("abce","happy")).toDF("col1")
>> val trans = new CategoryTransformer("1")
>> data.show()
>> trans.transform(data).show()
>>
>>
>> This transformer will make sure , you always have values in col1 as
>> provided by you.
>>
>>
>> Regards
>> Pralabh Kumar
>>
>> On Fri, Jun 16, 2017 at 8:10 PM, Saatvik Shah <saatvikshah1...@gmail.com>
>> wrote:
>>
>>> Hi Pralabh,
>>>
>>> I want the ability to create a column such that its values be restricted
>>> to a specific set of predefined values.
>>> For example, suppose I have a column called EMOTION: I want to ensure
>>> each row value is one of HAPPY,SAD,ANGRY,NEUTRAL,NA.
>>>
>>> Thanks and Regards,
>>> Saatvik Shah
>>>
>>>
>>> On Fri, Jun 16, 2017 at 10:30 AM, Pralabh Kumar <pralabhku...@gmail.com>
>>> wrote:
>>>
>>>> Hi satvik
>>>>
>>>> Can u please provide an example of what exactly you want.
>>>>
>>>>
>>>>
>>>> On 16-Jun-2017 7:40 PM, "Saatvik Shah" <saatvikshah1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Yan,
>>>>>
>>>>> Basically the reason I was looking for the categorical datatype is as
>>>>> given here
>>>>> <https://pandas.pydata.org/pandas-docs/stable/categorical.html>:
>>>>> ability to fix column values to specific categories. Is it possible to
>>>>> create a user defined data type which could do so?
>>>>>
>>>>> Thanks and Regards,
>>>>> Saatvik Shah
>>>>>
>>>>> On Fri, Jun 16, 2017 at 1:42 AM, 颜发才(Yan Facai) <facai@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> You can use some Transformers to handle categorical data,
>>>>>> For example,
>>>>>> StringIndexer encodes a string column of labels to a column of label
>>>>>> indices:
>>>>>> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 15, 2017 at 10:19 PM, saatvikshah1994 <
>>>>>> saatvikshah1...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I'm trying to convert a Pandas -> Spark dataframe. One of the
>>>>>>> columns I have
>>>>>>> is of the Category type in Pandas. But there does not seem to be
>>>>>>> support for
>>>>>>> this same type in Spark. What is the best alternative?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>>> 1001560.n3.nabble.com/Best-alternative-for-Category-Type-in-
>>>>>>> Spark-Dataframe-tp28764.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> 
>>>>>>> -
>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> *Saatvik Shah,*
>>>>> *1st  Year,*
>>>>> *Masters in the School of Computer Science,*
>>>>> *Carnegie Mellon University*
>>>>>
>>>>> *https://saatvikshah1994.github.io/
>>>>> <https://saatvikshah1994.github.io/>*
>>>>>
>>>>
>>>
>>>
>>> --
>>> *Saatvik Shah,*
>>> *1st  Year,*
>>> *Masters in the School of Computer Science,*
>>> *Carnegie Mellon University*
>>>
>>> *https://saatvikshah1994.github.io/ <https://saatvikshah1994.github.io/>*
>>>
>>
>>
>


Re: Best alternative for Category Type in Spark Dataframe

2017-06-16 Thread Pralabh Kumar
Hi Saatvik

You can write your own transformer to make sure that column contains ,value
which u provided , and filter out rows which doesn't follow the same.

Something like this


case class CategoryTransformer(override val uid : String) extends
Transformer{
  override def transform(inputData: DataFrame): DataFrame = {
inputData.select("col1").filter("col1 in ('happy')")
  }
  override def copy(extra: ParamMap): Transformer = ???
  @DeveloperApi
  override def transformSchema(schema: StructType): StructType ={
   schema
  }
}


Usage

val data = sc.parallelize(List("abce","happy")).toDF("col1")
val trans = new CategoryTransformer("1")
data.show()
trans.transform(data).show()


This transformer will make sure , you always have values in col1 as
provided by you.


Regards
Pralabh Kumar

On Fri, Jun 16, 2017 at 8:10 PM, Saatvik Shah <saatvikshah1...@gmail.com>
wrote:

> Hi Pralabh,
>
> I want the ability to create a column such that its values be restricted
> to a specific set of predefined values.
> For example, suppose I have a column called EMOTION: I want to ensure each
> row value is one of HAPPY,SAD,ANGRY,NEUTRAL,NA.
>
> Thanks and Regards,
> Saatvik Shah
>
>
> On Fri, Jun 16, 2017 at 10:30 AM, Pralabh Kumar <pralabhku...@gmail.com>
> wrote:
>
>> Hi satvik
>>
>> Can u please provide an example of what exactly you want.
>>
>>
>>
>> On 16-Jun-2017 7:40 PM, "Saatvik Shah" <saatvikshah1...@gmail.com> wrote:
>>
>>> Hi Yan,
>>>
>>> Basically the reason I was looking for the categorical datatype is as
>>> given here
>>> <https://pandas.pydata.org/pandas-docs/stable/categorical.html>:
>>> ability to fix column values to specific categories. Is it possible to
>>> create a user defined data type which could do so?
>>>
>>> Thanks and Regards,
>>> Saatvik Shah
>>>
>>> On Fri, Jun 16, 2017 at 1:42 AM, 颜发才(Yan Facai) <facai@gmail.com>
>>> wrote:
>>>
>>>> You can use some Transformers to handle categorical data,
>>>> For example,
>>>> StringIndexer encodes a string column of labels to a column of label
>>>> indices:
>>>> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
>>>>
>>>>
>>>> On Thu, Jun 15, 2017 at 10:19 PM, saatvikshah1994 <
>>>> saatvikshah1...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> I'm trying to convert a Pandas -> Spark dataframe. One of the columns
>>>>> I have
>>>>> is of the Category type in Pandas. But there does not seem to be
>>>>> support for
>>>>> this same type in Spark. What is the best alternative?
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context: http://apache-spark-user-list.
>>>>> 1001560.n3.nabble.com/Best-alternative-for-Category-Type-in-
>>>>> Spark-Dataframe-tp28764.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> *Saatvik Shah,*
>>> *1st  Year,*
>>> *Masters in the School of Computer Science,*
>>> *Carnegie Mellon University*
>>>
>>> *https://saatvikshah1994.github.io/ <https://saatvikshah1994.github.io/>*
>>>
>>
>
>
> --
> *Saatvik Shah,*
> *1st  Year,*
> *Masters in the School of Computer Science,*
> *Carnegie Mellon University*
>
> *https://saatvikshah1994.github.io/ <https://saatvikshah1994.github.io/>*
>


Re: Best alternative for Category Type in Spark Dataframe

2017-06-16 Thread Pralabh Kumar
Hi satvik

Can u please provide an example of what exactly you want.



On 16-Jun-2017 7:40 PM, "Saatvik Shah"  wrote:

> Hi Yan,
>
> Basically the reason I was looking for the categorical datatype is as
> given here :
> ability to fix column values to specific categories. Is it possible to
> create a user defined data type which could do so?
>
> Thanks and Regards,
> Saatvik Shah
>
> On Fri, Jun 16, 2017 at 1:42 AM, 颜发才(Yan Facai) 
> wrote:
>
>> You can use some Transformers to handle categorical data,
>> For example,
>> StringIndexer encodes a string column of labels to a column of label
>> indices:
>> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
>>
>>
>> On Thu, Jun 15, 2017 at 10:19 PM, saatvikshah1994 <
>> saatvikshah1...@gmail.com> wrote:
>>
>>> Hi,
>>> I'm trying to convert a Pandas -> Spark dataframe. One of the columns I
>>> have
>>> is of the Category type in Pandas. But there does not seem to be support
>>> for
>>> this same type in Spark. What is the best alternative?
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Best-alternative-for-Category-Type-in-
>>> Spark-Dataframe-tp28764.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>
>
> --
> *Saatvik Shah,*
> *1st  Year,*
> *Masters in the School of Computer Science,*
> *Carnegie Mellon University*
>
> *https://saatvikshah1994.github.io/ *
>


Re: Re: how to call udf with parameters

2017-06-15 Thread Pralabh Kumar
val getlength=udf((idx1:Int,idx2:Int, data : String)=>
data.substring(idx1,idx2))

data.select(getlength(lit(1),lit(2),data("col1"))).collect

On Fri, Jun 16, 2017 at 10:22 AM, Pralabh Kumar <pralabhku...@gmail.com>
wrote:

> Use lit , give me some time , I'll provide an example
>
> On 16-Jun-2017 10:15 AM, "lk_spark" <lk_sp...@163.com> wrote:
>
>> thanks Kumar , I want to know how to cao udf with multiple parameters ,
>> maybe an udf to make a substr function,how can I pass parameter with begin
>> and end index ?  I try it with errors. Does the udf parameters could only
>> be a column type?
>>
>> 2017-06-16
>> ------
>> lk_spark
>> --
>>
>> *发件人:*Pralabh Kumar <pralabhku...@gmail.com>
>> *发送时间:*2017-06-16 17:49
>> *主题:*Re: how to call udf with parameters
>> *收件人:*"lk_spark"<lk_sp...@163.com>
>> *抄送:*"user.spark"<user@spark.apache.org>
>>
>> sample UDF
>> val getlength=udf((data:String)=>data.length())
>> data.select(getlength(data("col1")))
>>
>> On Fri, Jun 16, 2017 at 9:21 AM, lk_spark <lk_sp...@163.com> wrote:
>>
>>> hi,all
>>>  I define a udf with multiple parameters  ,but I don't know how to
>>> call it with DataFrame
>>>
>>> UDF:
>>>
>>> def ssplit2 = udf { (sentence: String, delNum: Boolean, delEn: Boolean,
>>> minTermLen: Int) =>
>>> val terms = HanLP.segment(sentence).asScala
>>> .
>>>
>>> Call :
>>>
>>> scala> val output = input.select(ssplit2($"text",t
>>> rue,true,2).as('words))
>>> :40: error: type mismatch;
>>>  found   : Boolean(true)
>>>  required: org.apache.spark.sql.Column
>>>val output = input.select(ssplit2($"text",t
>>> rue,true,2).as('words))
>>>  ^
>>> :40: error: type mismatch;
>>>  found   : Boolean(true)
>>>  required: org.apache.spark.sql.Column
>>>val output = input.select(ssplit2($"text",t
>>> rue,true,2).as('words))
>>>   ^
>>> :40: error: type mismatch;
>>>  found   : Int(2)
>>>  required: org.apache.spark.sql.Column
>>>val output = input.select(ssplit2($"text",t
>>> rue,true,2).as('words))
>>>^
>>>
>>> scala> val output = input.select(ssplit2($"text",$
>>> "true",$"true",$"2").as('words))
>>> org.apache.spark.sql.AnalysisException: cannot resolve '`true`' given
>>> input columns: [id, text];;
>>> 'Project [UDF(text#6, 'true, 'true, '2) AS words#16]
>>> +- Project [_1#2 AS id#5, _2#3 AS text#6]
>>>+- LocalRelation [_1#2, _2#3]
>>>
>>> I need help!!
>>>
>>>
>>> 2017-06-16
>>> --
>>> lk_spark
>>>
>>
>>


Re: Re: how to call udf with parameters

2017-06-15 Thread Pralabh Kumar
Use lit , give me some time , I'll provide an example

On 16-Jun-2017 10:15 AM, "lk_spark" <lk_sp...@163.com> wrote:

> thanks Kumar , I want to know how to cao udf with multiple parameters ,
> maybe an udf to make a substr function,how can I pass parameter with begin
> and end index ?  I try it with errors. Does the udf parameters could only
> be a column type?
>
> 2017-06-16
> --
> lk_spark
> ------
>
> *发件人:*Pralabh Kumar <pralabhku...@gmail.com>
> *发送时间:*2017-06-16 17:49
> *主题:*Re: how to call udf with parameters
> *收件人:*"lk_spark"<lk_sp...@163.com>
> *抄送:*"user.spark"<user@spark.apache.org>
>
> sample UDF
> val getlength=udf((data:String)=>data.length())
> data.select(getlength(data("col1")))
>
> On Fri, Jun 16, 2017 at 9:21 AM, lk_spark <lk_sp...@163.com> wrote:
>
>> hi,all
>>  I define a udf with multiple parameters  ,but I don't know how to
>> call it with DataFrame
>>
>> UDF:
>>
>> def ssplit2 = udf { (sentence: String, delNum: Boolean, delEn: Boolean,
>> minTermLen: Int) =>
>> val terms = HanLP.segment(sentence).asScala
>> .
>>
>> Call :
>>
>> scala> val output = input.select(ssplit2($"text",true,true,2).as('words))
>> :40: error: type mismatch;
>>  found   : Boolean(true)
>>  required: org.apache.spark.sql.Column
>>val output = input.select(ssplit2($"text",true,true,2).as('words))
>>  ^
>> :40: error: type mismatch;
>>  found   : Boolean(true)
>>  required: org.apache.spark.sql.Column
>>val output = input.select(ssplit2($"text",true,true,2).as('words))
>>   ^
>> :40: error: type mismatch;
>>  found   : Int(2)
>>  required: org.apache.spark.sql.Column
>>val output = input.select(ssplit2($"text",true,true,2).as('words))
>>^
>>
>> scala> val output = input.select(ssplit2($"text",$
>> "true",$"true",$"2").as('words))
>> org.apache.spark.sql.AnalysisException: cannot resolve '`true`' given
>> input columns: [id, text];;
>> 'Project [UDF(text#6, 'true, 'true, '2) AS words#16]
>> +- Project [_1#2 AS id#5, _2#3 AS text#6]
>>+- LocalRelation [_1#2, _2#3]
>>
>> I need help!!
>>
>>
>> 2017-06-16
>> --
>> lk_spark
>>
>
>


Re: how to call udf with parameters

2017-06-15 Thread Pralabh Kumar
sample UDF
val getlength=udf((data:String)=>data.length())
data.select(getlength(data("col1")))

On Fri, Jun 16, 2017 at 9:21 AM, lk_spark  wrote:

> hi,all
>  I define a udf with multiple parameters  ,but I don't know how to
> call it with DataFrame
>
> UDF:
>
> def ssplit2 = udf { (sentence: String, delNum: Boolean, delEn: Boolean,
> minTermLen: Int) =>
> val terms = HanLP.segment(sentence).asScala
> .
>
> Call :
>
> scala> val output = input.select(ssplit2($"text",true,true,2).as('words))
> :40: error: type mismatch;
>  found   : Boolean(true)
>  required: org.apache.spark.sql.Column
>val output = input.select(ssplit2($"text",true,true,2).as('words))
>  ^
> :40: error: type mismatch;
>  found   : Boolean(true)
>  required: org.apache.spark.sql.Column
>val output = input.select(ssplit2($"text",true,true,2).as('words))
>   ^
> :40: error: type mismatch;
>  found   : Int(2)
>  required: org.apache.spark.sql.Column
>val output = input.select(ssplit2($"text",true,true,2).as('words))
>^
>
> scala> val output = input.select(ssplit2($"text",$
> "true",$"true",$"2").as('words))
> org.apache.spark.sql.AnalysisException: cannot resolve '`true`' given
> input columns: [id, text];;
> 'Project [UDF(text#6, 'true, 'true, '2) AS words#16]
> +- Project [_1#2 AS id#5, _2#3 AS text#6]
>+- LocalRelation [_1#2, _2#3]
>
> I need help!!
>
>
> 2017-06-16
> --
> lk_spark
>


Re: featureSubsetStrategy parameter for GradientBoostedTreesModel

2017-06-15 Thread Pralabh Kumar
Hi everyone

Currently GBT doesn't expose featureSubsetStrategy as exposed by Random
Forest.

.
GradientBoostedTrees in Spark have hardcoded feature subset strategy to
"all" while calling random forest in  DecisionTreeRegressor.scala

val trees = RandomForest.run(data, oldStrategy, numTrees = 1,
featureSubsetStrategy = "all",


It should provide functionality to the user to set featureSubsetStrategy
("auto", "all" ,"sqrt" , "log2" , "onethird") ,
the way random forest does.

This will help GBT to have randomness at feature level.

Jira SPARK-20199 <https://issues.apache.org/jira/browse/SPARK-20199>

Please let me know , if my understanding is correct.

Regards
Pralabh Kumar

On Fri, Jun 16, 2017 at 7:53 AM, Pralabh Kumar <pralabhku...@gmail.com>
wrote:

> Hi everyone
>
> Currently GBT doesn't expose featureSubsetStrategy as exposed by Random
> Forest.
>
> .
> GradientBoostedTrees in Spark have hardcoded feature subset strategy to
> "all" while calling random forest in  DecisionTreeRegressor.scala
>
> val trees = RandomForest.run(data, oldStrategy, numTrees = 1,
> featureSubsetStrategy = "all",
>
>
> It should provide functionality to the user to set featureSubsetStrategy
> ("auto", "all" ,"sqrt" , "log2" , "onethird") ,
> the way random forest does.
>
> This will help GBT to have randomness at feature level.
>
> Jira SPARK-20199 <https://issues.apache.org/jira/browse/SPARK-20199>
>
> Please let me know , if my understanding is correct.
>
> Regards
> Pralabh Kumar
>